Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ray] task based shuffle for ray #3040

Merged
merged 59 commits into from
Jun 27, 2022

Conversation

chaokunyang
Copy link
Contributor

@chaokunyang chaokunyang commented May 17, 2022

What do these changes do?

This PR implements pull-based shuffle for ray task backend, which will be the basis for later push-based shuffle for ray. Main changes include:

  • Introduce ShuffleFetchType which contains FETCH_BY_KEY/FETCH_BY_INDEX. If FETCH_BY_INDEX is used, source_keys/source_idxes/source_mappers will be skipped. In this way the meta and scheduling cost for supervisor will be much smaller, about 1000~10000x smaller for big shuffle. This will make mars able to schedule large-scale shuffle, even PB-scale data shuffle.
  • add n_mappers/n_reducers to FetchShuffle, and skip source_keys/source_idxes/source_mappers when shuffle_fetch_type is FETCH_BY_INDEX.
  • Operands compatibility: make mars operands compatible with FETCH_BY_INDEX.
    • Mappers may produce undefinite output nums: TensorReshape/TensorBincount
    • Mappers may produce shuffle blocks greater than num_reducer: PSRSAlign.
  • Introduce a new subtask dependency resolve mechanism, which alse remove data_keys for subtask execution on ray
  • Add ShuffleManager for shuffle execution on ray
  • Skip recording shuffle operands meta in supervisor
  • Support all shuffle operands on ray

Before this PR, for a shuffle with 100000 partitions, every reducer subtask meta will took about 16M memory, which make mars to handle only less than 1000 partitions, i.e. less than 1 T data.

With this PR, every reducer subtask took only 2Kb memory, which make mars can handle 8000_000 partitions. If every partition has 1Gb data, this PR will enable mars handle 8PB data shuffle.

#2916
#3039
#2112

Check code requirements

  • tests added / passed (if needed)
  • Ensure all linting tests pass, see here for how to run them

@chaokunyang chaokunyang requested a review from a team as a code owner May 17, 2022 04:25
@chaokunyang chaokunyang changed the title Ray task based simple shuffle Task based shuffle support for ray May 17, 2022
@chaokunyang chaokunyang force-pushed the ray_dag_based_simple_shuffle branch from 73f466d to 8427dd7 Compare May 17, 2022 05:27
@chaokunyang chaokunyang changed the title Task based shuffle support for ray [Ray] task based shuffle support for ray May 17, 2022
@chaokunyang chaokunyang force-pushed the ray_dag_based_simple_shuffle branch from 37b50b3 to fd866b2 Compare May 17, 2022 07:05
@chaokunyang chaokunyang force-pushed the ray_dag_based_simple_shuffle branch from ec143bb to 0c32530 Compare May 18, 2022 09:21
…shuffle

# Conflicts:
#	mars/core/operand/shuffle.py
#	mars/services/task/analyzer/analyzer.py
#	mars/services/task/execution/api.py
#	mars/tests/test_utils.py
@chaokunyang chaokunyang changed the title [Ray] task based shuffle support for ray [Ray] task based shuffle for ray May 19, 2022
@chaokunyang chaokunyang force-pushed the ray_dag_based_simple_shuffle branch from 0148219 to 7e0d239 Compare June 17, 2022 13:50
@chaokunyang chaokunyang force-pushed the ray_dag_based_simple_shuffle branch from 85423b1 to 7f9e8e8 Compare June 19, 2022 15:48
Copy link
Collaborator

@qinxuye qinxuye left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments

@@ -112,3 +128,11 @@ def iter_mapper_data(
ctx, input_id, mapper_id, pop=pop, skip_none=skip_none
):
yield data

def execute(self, ctx, op):
"""The mapper stage must ensure all mapper blocks are inserted into ctx and no blocks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, no block missing for both key and index, or just for index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for fetch_by_index

@chaokunyang
Copy link
Contributor Author

Since this PR introduce build_fetch_shuffle which doesn't exist on master, some asv benchmarks will fail
image

@chaokunyang chaokunyang force-pushed the ray_dag_based_simple_shuffle branch from 28b4ad4 to e5aa38b Compare June 24, 2022 10:46
Copy link
Contributor

@fyrestone fyrestone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Collaborator

@qinxuye qinxuye left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wjsi wjsi added this to the v0.10.0a2 milestone Jun 27, 2022
@wjsi wjsi merged commit 31bd6cc into mars-project:master Jun 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants