Skip to content

Commit

Permalink
Add _send_small_objects for sender to optimize sending small objects
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng committed May 20, 2022
1 parent 61c0c51 commit 2376990
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 62 deletions.
26 changes: 14 additions & 12 deletions mars/services/storage/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,23 @@ async def __post_create__(self):
if client.level & level:
clients[level] = client

async def _get_data(self, data_info, conditions):
@mo.extensible
async def get_data_by_info(self, data_info: DataInfo, conditions: List = None):
if conditions is None:
res = yield self._clients[data_info.level].get(data_info.object_id)
res = await self._clients[data_info.level].get(data_info.object_id)
else:
try:
res = yield self._clients[data_info.level].get(
res = await self._clients[data_info.level].get(
data_info.object_id, conditions=conditions
)
except NotImplementedError:
data = yield self._clients[data_info.level].get(data_info.object_id)
data = await self._clients[data_info.level].get(data_info.object_id)
try:
sliced_value = data.iloc[tuple(conditions)]
except AttributeError:
sliced_value = data[tuple(conditions)]
res = sliced_value
raise mo.Return(res)
return res

@mo.extensible
async def get(
Expand All @@ -111,7 +112,7 @@ async def get(
data_info = await self._data_manager_ref.get_data_info(
session_id, data_key, self._band_name
)
data = yield self._get_data(data_info, conditions)
data = yield self.get_data_by_info(data_info, conditions)
raise mo.Return(data)
except DataNotExist:
if error == "raise":
Expand Down Expand Up @@ -143,7 +144,7 @@ async def batch_get(self, args_list, kwargs_list):
if data_info is None:
results.append(None)
else:
result = yield self._get_data(data_info, conditions)
result = yield self.get_data_by_info(data_info, conditions)
results.append(result)
raise mo.Return(results)

Expand Down Expand Up @@ -314,12 +315,16 @@ async def batch_delete(self, args_list, kwargs_list):
for level, size in level_sizes.items():
await self._quota_refs[level].release_quota(size)

@mo.extensible
async def open_reader_by_info(self, data_info: DataInfo) -> StorageFileObject:
return await self._clients[data_info.level].open_reader(data_info.object_id)

@mo.extensible
async def open_reader(self, session_id: str, data_key: str) -> StorageFileObject:
data_info = await self._data_manager_ref.get_data_info(
session_id, data_key, self._band_name
)
reader = await self._clients[data_info.level].open_reader(data_info.object_id)
reader = await self.open_reader_by_info(data_info)
return reader

@open_reader.batch
Expand All @@ -333,10 +338,7 @@ async def batch_open_readers(self, args_list, kwargs_list):
)
data_infos = await self._data_manager_ref.get_data_info.batch(*get_data_infos)
return await asyncio.gather(
*[
self._clients[data_info.level].open_reader(data_info.object_id)
for data_info in data_infos
]
*[self.open_reader_by_info(data_info) for data_info in data_infos]
)

@mo.extensible
Expand Down
84 changes: 59 additions & 25 deletions mars/services/storage/tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,50 +101,84 @@ async def test_simple_transfer(create_actors):
storage_handler1 = await mo.actor_ref(
uid=StorageHandlerActor.gen_uid("numa-0"), address=worker_address_1
)
data_manager1 = await mo.actor_ref(
uid=DataManagerActor.default_uid(), address=worker_address_1
)
storage_handler2 = await mo.actor_ref(
uid=StorageHandlerActor.gen_uid("numa-0"), address=worker_address_2
)
data_manager2 = await mo.actor_ref(
uid=DataManagerActor.default_uid(), address=worker_address_2
)

await storage_handler1.put(session_id, "data_key1", data1, StorageLevel.MEMORY)
await storage_handler1.put(session_id, "data_key2", data2, StorageLevel.MEMORY)
await storage_handler2.put(session_id, "data_key3", data2, StorageLevel.MEMORY)

sender_actor = await mo.actor_ref(
# sender_actor1 use default block_size
sender_actor1 = await mo.actor_ref(
address=worker_address_1, uid=SenderManagerActor.gen_uid("numa-0")
)

# send data to worker2 from worker1
await sender_actor.send_batch_data(
session_id,
["data_key1"],
worker_address_2,
StorageLevel.MEMORY,
block_size=1000,
# send_actor2 set block_size to 0
sender_actor2 = await mo.create_actor(
SenderManagerActor,
"numa-0",
0,
data_manager1,
storage_handler1,
uid=SenderManagerActor.gen_uid("mock"),
address=worker_address_1,
)

await sender_actor.send_batch_data(
session_id,
["data_key2"],
worker_address_2,
StorageLevel.MEMORY,
block_size=1000,
)
for i, sender_actor in enumerate([sender_actor1, sender_actor2]):
# send data to worker2 from worker1
await sender_actor.send_batch_data(
session_id,
["data_key1"],
worker_address_2,
StorageLevel.MEMORY,
block_size=1000,
)

get_data1 = await storage_handler2.get(session_id, "data_key1")
np.testing.assert_array_equal(data1, get_data1)
await sender_actor.send_batch_data(
session_id,
["data_key2"],
worker_address_2,
StorageLevel.MEMORY,
block_size=1000,
)

get_data1 = await storage_handler2.get(session_id, "data_key1")
np.testing.assert_array_equal(data1, get_data1)

get_data2 = await storage_handler2.get(session_id, "data_key2")
pd.testing.assert_frame_equal(data2, get_data2)
get_data2 = await storage_handler2.get(session_id, "data_key2")
pd.testing.assert_frame_equal(data2, get_data2)
await storage_handler2.delete(session_id, "data_key1")
await storage_handler2.delete(session_id, "data_key2")

# send data to worker1 from worker2
sender_actor = await mo.actor_ref(
sender_actor1 = await mo.actor_ref(
address=worker_address_2, uid=SenderManagerActor.gen_uid("numa-0")
)
await sender_actor.send_batch_data(
session_id, ["data_key3"], worker_address_1, StorageLevel.MEMORY
# send_actor2 set block_size to 0
sender_actor2 = await mo.create_actor(
SenderManagerActor,
"numa-0",
0,
data_manager2,
storage_handler2,
uid=SenderManagerActor.gen_uid("mock"),
address=worker_address_2,
)
get_data3 = await storage_handler1.get(session_id, "data_key3")
pd.testing.assert_frame_equal(data2, get_data3)
for i, sender_actor in enumerate([sender_actor1, sender_actor2]):
# send data to worker1 from worker2
data_key = f"data_key3"
await sender_actor.send_batch_data(
session_id, [data_key], worker_address_1, StorageLevel.MEMORY
)
get_data3 = await storage_handler1.get(session_id, data_key)
pd.testing.assert_frame_equal(data2, get_data3)
await storage_handler1.delete(session_id, "data_key3")


# test for cancelling happens when writing
Expand Down
117 changes: 92 additions & 25 deletions mars/services/storage/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import asyncio
from dataclasses import dataclass
import logging
from typing import Dict, Union, Any, List
from typing import Dict, Union, Any, List, Tuple

from ... import oscar as mo
from ...lib.aio import alru_cache
Expand All @@ -28,7 +28,7 @@
)
from ...storage import StorageLevel
from ...utils import dataslots
from .core import DataManagerActor, WrappedStorageFileObject
from .core import DataManagerActor, WrappedStorageFileObject, DataInfo
from .handler import StorageHandlerActor

DEFAULT_TRANSFER_BLOCK_SIZE = 4 * 1024**2
Expand Down Expand Up @@ -96,6 +96,7 @@ async def _send_data(
receiver_ref: Union[mo.ActorRef],
session_id: str,
data_keys: List[str],
data_infos: List[DataInfo],
level: StorageLevel,
block_size: int,
):
Expand Down Expand Up @@ -129,11 +130,13 @@ async def send(self, buffer, eof_mark, key):

sender = BufferedSender()
open_reader_tasks = []
for data_key in data_keys:
for data_key, info in zip(data_keys, data_infos):
open_reader_tasks.append(
self._storage_handler.open_reader.delay(session_id, data_key)
self._storage_handler.open_reader_by_info.delay(info)
)
readers = await self._storage_handler.open_reader.batch(*open_reader_tasks)
readers = await self._storage_handler.open_reader_by_info.batch(
*open_reader_tasks
)

for data_key, reader in zip(data_keys, readers):
while True:
Expand All @@ -152,7 +155,61 @@ async def send(self, buffer, eof_mark, key):
break
await sender.flush()

@mo.extensible
async def _send(
self,
session_id: str,
data_keys: List[Union[str, Tuple]],
data_infos: List[DataInfo],
data_sizes: List[int],
block_size: int,
address: str,
band_name: str,
level: StorageLevel,
):
receiver_ref: Union[
ReceiverManagerActor, mo.ActorRef
] = await self.get_receiver_ref(address, band_name)
is_transferring_list = await receiver_ref.open_writers(
session_id, data_keys, data_sizes, level
)
to_send_keys = []
to_send_infos = []
to_wait_keys = []
for data_key, is_transferring, info in zip(
data_keys, is_transferring_list, data_infos
):
if is_transferring:
to_wait_keys.append(data_key)
else:
to_send_keys.append(data_key)
to_send_infos.append(info)

if to_send_keys:
await self._send_data(
receiver_ref, session_id, to_send_keys, to_send_infos, level, block_size
)
if to_wait_keys:
await receiver_ref.wait_transfer_done(session_id, to_wait_keys)

async def _send_small_objects(
self,
session_id: str,
data_keys: List[Union[str, Tuple]],
data_infos: List[DataInfo],
address: str,
band_name: str,
level: StorageLevel,
):
# simple get all objects and send them all to receiver
get_tasks = [
self._storage_handler.get_data_by_info.delay(info) for info in data_infos
]
data_list = await self._storage_handler.get_data_by_info.batch(*get_tasks)
receiver_ref: Union[
ReceiverManagerActor, mo.ActorRef
] = await self.get_receiver_ref(address, band_name)
await receiver_ref.put_small_objects(session_id, data_keys, data_list, level)

async def send_batch_data(
self,
session_id: str,
Expand All @@ -167,9 +224,6 @@ async def send_batch_data(
"Begin to send data (%s, %s) to %s", session_id, data_keys, address
)
block_size = block_size or self._transfer_block_size
receiver_ref: Union[
ReceiverManagerActor, mo.ActorRef
] = await self.get_receiver_ref(address, band_name)
get_infos = []
pin_tasks = []
for data_key in data_keys:
Expand Down Expand Up @@ -198,23 +252,27 @@ async def send_batch_data(
data_sizes = [info.store_size for info in infos]
if level is None:
level = infos[0].level
is_transferring_list = await receiver_ref.open_writers(
session_id, data_keys, data_sizes, level
)
to_send_keys = []
to_wait_keys = []
for data_key, is_transferring in zip(data_keys, is_transferring_list):
if is_transferring:
to_wait_keys.append(data_key)
else:
to_send_keys.append(data_key)

if to_send_keys:
await self._send_data(
receiver_ref, session_id, to_send_keys, level, block_size
total_size = sum(data_sizes)
if total_size > block_size:
logger.debug("Choose block method for sending data of %s bytes", total_size)
await self._send(
session_id,
data_keys,
infos,
data_sizes,
block_size,
address,
band_name,
level,
)
else:
logger.debug(
"Choose send_small_objects method for sending data of %s bytes",
total_size,
)
await self._send_small_objects(
session_id, data_keys, infos, address, band_name, level
)
if to_wait_keys:
await receiver_ref.wait_transfer_done(session_id, to_wait_keys)
unpin_tasks = []
for data_key in data_keys:
unpin_tasks.append(
Expand Down Expand Up @@ -268,6 +326,15 @@ def _decref_writing_key(self, session_id: str, data_key: str):
if self._writing_infos[(session_id, data_key)].ref_counts == 0:
del self._writing_infos[(session_id, data_key)]

async def put_small_objects(
self, session_id: str, data_keys: List[str], objects: List, level: StorageLevel
):
tasks = [
self._storage_handler.put.delay(session_id, data_key, obj, level)
for data_key, obj in zip(data_keys, objects)
]
await self._storage_handler.put.batch(*tasks)

async def create_writers(
self,
session_id: str,
Expand Down

0 comments on commit 2376990

Please sign in to comment.