Skip to content

Commit

Permalink
[Shuffle] Skip store shuffle object refs to reduce meta overhead (mar…
Browse files Browse the repository at this point in the history
…s-project#3209)

* disable shuffle in autoscale to skip shuffle meta

* fix remove mapper data

* refine autoscale in

* fix SubtaskGraph add proxy chunks

* add shuffle tests to autoscale

* fxi mapper chunks check

* remove unnecessary event

* refine proxy_subtasks check

* workaround versionner compatibility with PEP660

* fix get autoscaler

* fix subtask graph building
  • Loading branch information
chaokunyang authored Sep 6, 2022
1 parent 5604cea commit b2d658e
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 36 deletions.
7 changes: 5 additions & 2 deletions mars/deploy/oscar/tests/test_ray_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,18 @@ def f(pdf, latch):
await asyncio.sleep(1)
# Test data on node of released worker can still be fetched
pd_df = df.fetch()
groupby_sum_df = df.rechunk(chunk_size * 2).groupby("a").sum()
groupby_sum_df = (
df.rechunk(chunk_size * 2).groupby("a").apply(lambda pdf: pdf.sum())
)
logger.info(groupby_sum_df.execute())
while await autoscaler_ref.get_dynamic_worker_nums() > 1:
dynamic_workers = await autoscaler_ref.get_dynamic_workers()
logger.info(f"Waiting workers %s to be released.", dynamic_workers)
await asyncio.sleep(1)
assert df.to_pandas().to_dict() == pd_df.to_dict()
assert (
groupby_sum_df.to_pandas().to_dict() == pd_df.groupby("a").sum().to_dict()
groupby_sum_df.to_pandas().to_dict()
== pd_df.groupby("a").apply(lambda pdf: pdf.sum()).to_dict()
)


Expand Down
28 changes: 26 additions & 2 deletions mars/services/scheduling/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@

class SchedulingAPI(AbstractSchedulingAPI):
def __init__(
self, session_id: str, address: str, manager_ref=None, queueing_ref=None
self,
session_id: str,
address: str,
manager_ref=None,
queueing_ref=None,
autoscaler_ref=None,
):
self._session_id = session_id
self._address = address

self._manager_ref = manager_ref
self._queueing_ref = queueing_ref
self._autoscaler = autoscaler_ref

@classmethod
@alru_cache
Expand All @@ -47,7 +53,16 @@ async def create(cls: Type[APIType], session_id: str, address: str) -> APIType:
SubtaskQueueingActor.gen_uid(session_id), address=address
)

scheduling_api = SchedulingAPI(session_id, address, manager_ref, queueing_ref)
from ...cluster import ClusterAPI
from ..supervisor.autoscale import AutoscalerActor

cluster_api = await ClusterAPI.create(address)
[autoscaler] = await cluster_api.get_supervisor_refs(
[AutoscalerActor.default_uid()]
)
scheduling_api = SchedulingAPI(
session_id, address, manager_ref, queueing_ref, autoscaler
)
return scheduling_api

async def get_subtask_schedule_summaries(
Expand Down Expand Up @@ -131,6 +146,15 @@ async def finish_subtasks(
"""
await self._manager_ref.finish_subtasks(subtask_ids, bands, schedule_next)

async def disable_autoscale_in(self):
"""Disable autoscale in"""
await self._autoscaler.disable_autoscale_in()

async def try_enable_autoscale_in(self):
"""Try to enable autoscale in, the autoscale-in will be enabled only when last call corresponding
`disable_autoscale_in` has been invoked."""
await self._autoscaler.try_enable_autoscale_in()


class MockSchedulingAPI(SchedulingAPI):
@classmethod
Expand Down
26 changes: 22 additions & 4 deletions mars/services/scheduling/supervisor/autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, autoscale_conf: Dict[str, Any]):
self.queueing_refs = dict()
self.global_resource_ref = None
self._dynamic_workers: Set[str] = set()
self._autoscale_in_disable_counter = 0

async def __post_create__(self):
strategy = self._autoscale_conf.get("strategy")
Expand Down Expand Up @@ -95,19 +96,33 @@ async def request_worker(
time.time() - start_time,
)

async def release_workers(self, addresses: List[str]):
async def disable_autoscale_in(self):
self._autoscale_in_disable_counter += 1
if self._enabled:
logger.info("Disabled autoscale_in")

async def try_enable_autoscale_in(self):
self._autoscale_in_disable_counter -= 1
if self._autoscale_in_disable_counter == 0 and self._enabled:
logger.info("Enabled autoscale_in")

async def release_workers(self, addresses: List[str]) -> List[str]:
"""
Release a group of worker nodes.
Parameters
----------
addresses : List[str]
The addresses of the specified noded.
The addresses of the specified node.
"""
if self._autoscale_in_disable_counter > 0:
return []
workers_bands = {
address: await self.get_worker_bands(address) for address in addresses
}
logger.info(
"Start to release workers %s which have bands %s.", addresses, workers_bands
"Start to release workers %s which have bands %s.",
addresses,
workers_bands,
)
for address in addresses:
await self._cluster_api.set_node_status(
Expand Down Expand Up @@ -136,6 +151,7 @@ async def release_worker(address):
# is not being releasing.
for address in addresses:
await release_worker(address)
return addresses

def get_dynamic_workers(self) -> Set[str]:
return self._dynamic_workers
Expand Down Expand Up @@ -405,7 +421,9 @@ async def _scale_in(self):
idle_bands,
)
try:
await self._autoscaler.release_workers(worker_addresses)
worker_addresses = await self._autoscaler.release_workers(
worker_addresses
)
logger.info(
"Finished offline workers %s in %.4f seconds",
worker_addresses,
Expand Down
6 changes: 4 additions & 2 deletions mars/services/scheduling/supervisor/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ async def create_session(self, session_id: str):
uid=SubtaskManagerActor.gen_uid(session_id),
)

from ...cluster import ClusterAPI
from .autoscale import AutoscalerActor

autoscaler_ref = await mo.actor_ref(
AutoscalerActor.default_uid(), address=self._address
cluster_api = await ClusterAPI.create(self._address)
[autoscaler_ref] = await cluster_api.get_supervisor_refs(
[AutoscalerActor.default_uid()]
)
await autoscaler_ref.register_session(session_id, self._address)

Expand Down
13 changes: 13 additions & 0 deletions mars/services/subtask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ class SubtaskGraph(DAG, Iterable[Subtask]):
Subtask graph.
"""

def __init__(self):
super().__init__()
self._proxy_subtasks = []

@classmethod
def _extract_operands(cls, node: Subtask):
from ...core.operand import Fetch, FetchShuffle
Expand All @@ -200,3 +204,12 @@ def _extract_operands(cls, node: Subtask):
if isinstance(node.op, (Fetch, FetchShuffle)):
continue
yield node.op

def add_shuffle_proxy_subtask(self, proxy_subtask):
self._proxy_subtasks.append(proxy_subtask)

def num_shuffles(self) -> int:
return len(self._proxy_subtasks)

def get_shuffle_proxy_subtasks(self):
return self._proxy_subtasks
3 changes: 2 additions & 1 deletion mars/services/subtask/worker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ async def _store_meta(
mapper_keys = get_mapper_data_keys(chunk_key, data_key_to_store_size)
store_size = sum(data_key_to_store_size[k] for k in mapper_keys)
memory_size = sum(data_key_to_memory_size[k] for k in mapper_keys)
object_ref = [data_key_to_object_id[k] for k in mapper_keys]
# Skip meta for shuffle
object_ref = None
# for worker, if chunk in update_meta_chunks
# save meta including dtypes_value etc, otherwise,
# save basic meta only
Expand Down
26 changes: 11 additions & 15 deletions mars/services/task/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def _gen_subtask_info(
chunk_to_subtask: Dict[ChunkType, Subtask],
chunk_to_bands: Dict[ChunkType, BandType],
chunk_to_fetch_chunk: Dict[ChunkType, ChunkType],
) -> Tuple[Subtask, List[Subtask]]:
) -> Tuple[Subtask, List[Subtask], bool]:
# gen subtask and its input subtasks
chunks_set = set(chunks)
result_chunks = []
Expand Down Expand Up @@ -315,19 +315,13 @@ def _gen_subtask_info(
extra_config=self._extra_config,
)

if (
self._has_shuffle
and self._shuffle_fetch_type == ShuffleFetchType.FETCH_BY_INDEX
):
shuffle_chunks = [
c for c in result_chunks if isinstance(c, MapReduceOperand)
]
# ensure no shuffle mapper chunks fused into same subtask and subtask only produce mapper outputs
# which can be merged dynamically by the reducers.
assert len(shuffle_chunks) <= 1, shuffle_chunks
proxy_chunks = [c for c in chunks if isinstance(c.op, ShuffleProxy)]
assert len(proxy_chunks) <= 1, proxy_chunks
return subtask, inp_subtasks
is_shuffle_proxy = False
if self._has_shuffle:
proxy_chunks = [c for c in result_chunks if isinstance(c.op, ShuffleProxy)]
if proxy_chunks:
assert len(proxy_chunks) <= 1, proxy_chunks
is_shuffle_proxy = True
return subtask, inp_subtasks, is_shuffle_proxy

def _gen_logic_key(self, chunks: List[ChunkType]):
return tokenize(
Expand Down Expand Up @@ -469,13 +463,15 @@ def gen_subtask_graph(
if all(isinstance(c.op, Fetch) for c in same_color_chunks):
# all fetch ops, no need to gen subtask
continue
subtask, inp_subtasks = self._gen_subtask_info(
subtask, inp_subtasks, is_shuffle_proxy = self._gen_subtask_info(
same_color_chunks,
chunk_to_subtask,
chunk_to_bands,
chunk_to_fetch_chunk,
)
subtask_graph.add_node(subtask)
if is_shuffle_proxy:
subtask_graph.add_shuffle_proxy_subtask(subtask)
logic_key_to_subtasks[subtask.logic_key].append(subtask)
for inp_subtask in inp_subtasks:
subtask_graph.add_edge(inp_subtask, subtask)
Expand Down
4 changes: 3 additions & 1 deletion mars/services/task/analyzer/tests/test_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
t2 = t1.reshape(27, 31)
t2.op.extra_params["_reshape_with_shuffle"] = True
df1 = md.DataFrame(t1, columns=[f"c{i}" for i in range(t1.shape[1])])
df2 = df1.groupby(["c1"]).apply(lambda pdf: pdf.sum())


@pytest.mark.parametrize("tileable", [df1.describe(), t2])
@pytest.mark.parametrize("tileable", [df1.describe(), df2, t2])
@pytest.mark.parametrize("fuse", [True, False])
def test_shuffle_graph(tileable, fuse):
# can't test df.groupby and mt.bincount, those chunk graph build depend on ctx.get_chunks_meta/get_chunks_result
Expand Down Expand Up @@ -61,6 +62,7 @@ def test_shuffle_graph(tileable, fuse):
]
assert len(proxy_subtasks) == len(proxy_chunks)
assert len(proxy_subtasks) > 0
assert len(proxy_subtasks) == len(subtask_graph.get_shuffle_proxy_subtasks())
for proxy_chunk, proxy_subtask in zip(proxy_chunks, proxy_subtasks):
reducer_subtasks = subtask_graph.successors(proxy_subtask)
for reducer_subtask in reducer_subtasks:
Expand Down
11 changes: 11 additions & 0 deletions mars/services/task/execution/mars/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,17 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None)
)

async def run(self):
try:
if self.subtask_graph.num_shuffles() > 0:
# disable scale-in when shuffle is executing so that we can skip
# store shuffle meta in supervisor.
await self._scheduling_api.disable_autoscale_in()
return await self._run()
finally:
if self.subtask_graph.num_shuffles() > 0:
await self._scheduling_api.try_enable_autoscale_in()

async def _run(self):
if len(self.subtask_graph) == 0:
# no subtask to schedule, set status to done
self._schedule_done()
Expand Down
14 changes: 5 additions & 9 deletions mars/services/task/execution/ray/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import numpy as np
from typing import List, Iterable

from .....core.operand import ShuffleProxy, MapReduceOperand, OperandStage
from .....core.operand import MapReduceOperand, OperandStage
from .....utils import lazy_import
from ....subtask import Subtask
from ....subtask import Subtask, SubtaskGraph

ray = lazy_import("ray")

Expand All @@ -27,14 +27,10 @@ class ShuffleManager:
mapper and reducer index.
"""

def __init__(self, subtask_graph):
def __init__(self, subtask_graph: SubtaskGraph):
self.subtask_graph = subtask_graph
self._proxy_subtasks = []
for subtask in subtask_graph:
chunk = subtask.chunk_graph.results[0]
if isinstance(chunk.op, ShuffleProxy):
self._proxy_subtasks.append(subtask)
self.num_shuffles = len(self._proxy_subtasks)
self._proxy_subtasks = subtask_graph.get_shuffle_proxy_subtasks()
self.num_shuffles = subtask_graph.num_shuffles()
self.mapper_output_refs = []
self.mapper_indices = {}
self.reducer_indices = {}
Expand Down

0 comments on commit b2d658e

Please sign in to comment.