Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Fix the error when multiple subtasks fetch the same data (#2322) #2340

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions mars/oscar/backends/communication/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@


class SocketChannel(Channel):
__slots__ = 'reader', 'writer', '_channel_type', '_lock'
__slots__ = 'reader', 'writer', '_channel_type', '_send_lock', '_recv_lock'

name = 'socket'

Expand All @@ -53,7 +53,8 @@ def __init__(self,
self.writer = writer
self._channel_type = channel_type

self._lock = asyncio.Lock()
self._send_lock = asyncio.Lock()
self._recv_lock = asyncio.Lock()

@property
@implements(Channel.type)
Expand All @@ -69,16 +70,17 @@ async def send(self, message: Any):

# write buffers
write_buffers(self.writer, buffers)
async with self._lock:
async with self._send_lock:
# add lock, or when parallel send,
# assertion error may be raised
await self.writer.drain()

@implements(Channel.recv)
async def recv(self):
deserializer = AioDeserializer(self.reader)
header = await deserializer.get_header()
buffers = await read_buffers(header, self.reader)
async with self._recv_lock:
header = await deserializer.get_header()
buffers = await read_buffers(header, self.reader)
return deserialize(header, buffers)

@implements(Channel.close)
Expand Down
8 changes: 8 additions & 0 deletions mars/services/meta/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ async def add_chunk_bands(self,
bands: List[BandType]):
return await self._meta_store.add_chunk_bands(object_id, bands)

@add_chunk_bands.batch
async def batch_add_chunk_bands(self, args_list, kwargs_list):
add_chunk_bands_tasks = []
for args, kwargs in zip(args_list, kwargs_list):
add_chunk_bands_tasks.append(
self._meta_store.add_chunk_bands.delay(*args, **kwargs))
return await self._meta_store.add_chunk_bands.batch(*add_chunk_bands_tasks)


class MockMetaAPI(MetaAPI):
@classmethod
Expand Down
17 changes: 14 additions & 3 deletions mars/services/meta/store/dictionary.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,21 @@ async def batch_del_meta(self, args_list, kwargs_list):
for args, kwargs in zip(args_list, kwargs_list):
self._del_meta(*args, **kwargs)

def _add_chunk_bands(self,
object_id: str,
bands: List[BandType]):
meta = self._store[object_id]
assert isinstance(meta, _ChunkMeta)
meta.bands = list(set(meta.bands) | set(bands))

@implements(AbstractMetaStore.add_chunk_bands)
@mo.extensible
async def add_chunk_bands(self,
object_id: str,
bands: List[BandType]):
meta = self._store[object_id]
assert isinstance(meta, _ChunkMeta)
meta.bands = list(set(meta.bands) | set(bands))
self._add_chunk_bands(object_id, bands)

@add_chunk_bands.batch
async def batch_add_chunk_bands(self, args_list, kwargs_list):
for args, kwargs in zip(args_list, kwargs_list):
self._add_chunk_bands(*args, **kwargs)
8 changes: 6 additions & 2 deletions mars/services/storage/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class InternalDataInfo:
object_info: ObjectInfo


class DataManagerActor(mo.StatelessActor):
class DataManagerActor(mo.Actor):
_data_key_to_info: Dict[tuple, List[InternalDataInfo]]

def __init__(self, bands: List):
Expand Down Expand Up @@ -264,6 +264,7 @@ def pin(self, session_id, data_key, band_name):
info = self.get_data_info(session_id, data_key, band_name)
self._spill_strategy[info.level, info.band].pin_data((session_id, data_key))

@mo.extensible
def unpin(self,
session_id: str,
data_keys: List[str],
Expand Down Expand Up @@ -428,6 +429,7 @@ async def _create_storage_handler_actors(self):
if band_name.startswith('gpu-'): # pragma: no cover
await mo.create_actor(
SenderManagerActor, band_name,
data_manager_ref=self._data_manager,
storage_handler_ref=handler_ref,
uid=SenderManagerActor.gen_uid(band_name),
address=self.address, allocate_strategy=sender_strategy)
Expand Down Expand Up @@ -459,7 +461,9 @@ async def _create_transfer_actors(self):
address=self.address,
allocate_strategy=handler_strategy)
await mo.create_actor(
SenderManagerActor, storage_handler_ref=handler_ref,
SenderManagerActor,
data_manager_ref=self._data_manager,
storage_handler_ref=handler_ref,
uid=SenderManagerActor.gen_uid(default_band_name),
address=self.address, allocate_strategy=sender_strategy)

Expand Down
10 changes: 8 additions & 2 deletions mars/services/storage/spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ async def spill(request_size: int,
break
else:
await writer.write(block_data)
await storage_handler.delete_object(
session_id, key, size, reader.object_id, level)
try:
await storage_handler.delete_object(
session_id, key, size, reader.object_id, level)
except KeyError: # pragma: no cover
# workaround for the case that the object
# has been deleted during spill
logger.debug('Data %s %s is deleted during spill', session_id, key)
await storage_handler.delete(session_id, key, error='ignore')
logger.debug('Spill finishes, release %s bytes of %s', sum(spill_sizes), level)
54 changes: 51 additions & 3 deletions mars/services/storage/tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import mars.oscar as mo
from mars.oscar.backends.allocate_strategy import IdleLabel
from mars.services.storage.errors import DataNotExist
from mars.services.storage.core import StorageManagerActor, StorageQuotaActor
from mars.services.storage.core import DataManagerActor, StorageManagerActor,\
StorageQuotaActor
from mars.services.storage.handler import StorageHandlerActor
from mars.services.storage.transfer import ReceiverManagerActor, SenderManagerActor
from mars.storage import StorageLevel
Expand Down Expand Up @@ -146,7 +147,7 @@ async def create_writers(self,
data_sizes,
level):
await asyncio.sleep(3)
await super().create_writers(session_id, data_keys, data_sizes, level)
return await super().create_writers(session_id, data_keys, data_sizes, level)


class MockSenderManagerActor2(SenderManagerActor):
Expand All @@ -166,6 +167,8 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver):
quota_refs = {StorageLevel.MEMORY: await mo.actor_ref(
StorageQuotaActor, StorageLevel.MEMORY, 5 * 1024 * 1024,
address=worker_address_2, uid=StorageQuotaActor.gen_uid('numa-0', StorageLevel.MEMORY))}
data_manager_ref = await mo.actor_ref(uid=DataManagerActor.default_uid(),
address=worker_address_1)
storage_handler1 = await mo.actor_ref(
uid=StorageHandlerActor.gen_uid('numa-0'),
address=worker_address_1)
Expand All @@ -174,7 +177,8 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver):
address=worker_address_2)

sender_actor = await mo.create_actor(
mock_sender, uid=mock_sender.default_uid(),
mock_sender, data_manager_ref=data_manager_ref,
uid=mock_sender.default_uid(),
address=worker_address_1, allocate_strategy=IdleLabel('io', 'mock_sender'))
await mo.create_actor(
mock_receiver, quota_refs, uid=mock_receiver.default_uid(),
Expand All @@ -183,6 +187,9 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver):
data1 = np.random.rand(10, 10)
await storage_handler1.put('mock', 'data_key1',
data1, StorageLevel.MEMORY)
data2 = pd.DataFrame(np.random.rand(100, 100))
await storage_handler1.put('mock', 'data_key2',
data2, StorageLevel.MEMORY)

used_before = (await quota_refs[StorageLevel.MEMORY].get_quota())[1]

Expand All @@ -206,3 +213,44 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver):
await send_task
get_data = await storage_handler2.get('mock', 'data_key1')
np.testing.assert_array_equal(data1, get_data)

# cancel when fetch the same data Simultaneously
if mock_sender is MockSenderManagerActor:
send_task1 = asyncio.create_task(sender_actor.send_batch_data(
'mock', ['data_key2'], worker_address_2, StorageLevel.MEMORY))
send_task2 = asyncio.create_task(sender_actor.send_batch_data(
'mock', ['data_key2'], worker_address_2, StorageLevel.MEMORY))
await asyncio.sleep(0.5)
send_task1.cancel()
with pytest.raises(asyncio.CancelledError):
await send_task1
await send_task2
get_data2 = await storage_handler2.get('mock', 'data_key2')
pd.testing.assert_frame_equal(get_data2, data2)


@pytest.mark.asyncio
async def test_transfer_same_data(create_actors):
worker_address_1, worker_address_2 = create_actors

session_id = 'mock_session'
data1 = np.random.rand(100, 100)
storage_handler1 = await mo.actor_ref(uid=StorageHandlerActor.gen_uid('numa-0'),
address=worker_address_1)
storage_handler2 = await mo.actor_ref(uid=StorageHandlerActor.gen_uid('numa-0'),
address=worker_address_2)

await storage_handler1.put(session_id, 'data_key1', data1, StorageLevel.MEMORY)
sender_actor = await mo.actor_ref(address=worker_address_1,
uid=SenderManagerActor.gen_uid('numa-0'))

# send data to worker2 from worker1
task1 = asyncio.create_task(
sender_actor.send_batch_data(session_id, ['data_key1'], worker_address_2,
StorageLevel.MEMORY, block_size=1000))
task2 = asyncio.create_task(
sender_actor.send_batch_data(session_id, ['data_key1'], worker_address_2,
StorageLevel.MEMORY, block_size=1000))
await asyncio.gather(task1, task2)
get_data1 = await storage_handler2.get(session_id, 'data_key1')
np.testing.assert_array_equal(data1, get_data1)
Loading