Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng committed Aug 10, 2021
1 parent bc360f5 commit 3723a8e
Show file tree
Hide file tree
Showing 8 changed files with 7,646 additions and 32 deletions.
3 changes: 1 addition & 2 deletions mars/deploy/oscar/tests/test_cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ def test_parse_args():
'store_memory': '20M',
'plasma_directory': '/dev/shm',
}
assert app.config['storage']['filesystem'] == {
assert app.config['storage']['disk'] == {
'root_dirs': '/tmp',
'level': 'DISK',
}
7 changes: 3 additions & 4 deletions mars/deploy/oscar/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,14 @@ def parse_args(self, parser, argv, environ=None):
storage_config = self.config['storage'] = self.config.get('storage', {})
backends = storage_config['backends'] = storage_config.get('backends', [])
plasma_config = storage_config['plasma'] = storage_config.get('plasma', {})
filesystem_config = storage_config['filesystem'] = storage_config.get('filesystem', {})
disk_config = storage_config['disk'] = storage_config.get('disk', {})
if 'MARS_CACHE_MEM_SIZE' in environ:
plasma_config['store_memory'] = environ['MARS_CACHE_MEM_SIZE']
if 'MARS_PLASMA_DIRS' in environ:
plasma_config['plasma_directory'] = environ['MARS_PLASMA_DIRS']
if 'MARS_SPILL_DIRS' in environ:
backends.append('filesystem')
filesystem_config['root_dirs'] = environ['MARS_SPILL_DIRS']
filesystem_config['level'] = 'DISK'
backends.append('disk')
disk_config['root_dirs'] = environ['MARS_SPILL_DIRS']

return args

Expand Down
16 changes: 14 additions & 2 deletions mars/services/storage/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async def get_spill_keys(self,
return self._spill_strategy[level, band_name].get_spill_keys(size)


class StorageManagerActor(mo.Actor):
class StorageManagerActor(mo.StatelessActor):
"""
Storage manager actor, created only on main process, mainly to setup storage backends
and create all the necessary actors for storage service.
Expand Down Expand Up @@ -358,7 +358,7 @@ async def __post_create__(self):
# create actor for transfer
await self._create_transfer_actors()
# create task for uploading storage usages
await self.upload_storage_info()
self._upload_task = self.ref().upload_storage_info.tell_delay(delay=1)

async def __pre_destroy__(self):
if self._upload_task:
Expand Down Expand Up @@ -515,3 +515,15 @@ async def upload_storage_info(self):
self._cluster_api.set_band_storage_info.delay(band, storage_info))
await self._cluster_api.set_band_storage_info.batch(*upload_tasks)
self._upload_task = self.ref().upload_storage_info.tell_delay(delay=1)

async def upload_disk_info(self):
from ..cluster import DiskInfo

disk_infos = []
if self._cluster_api is not None:
if 'disk' in self._init_params:
params = self._init_params['disk']
size = params['size']
for path in params['root_dirs']:
disk_infos.append(DiskInfo(path=path, size=size))
await self._cluster_api.set_node_disk_info(disk_infos)
Loading

0 comments on commit 3723a8e

Please sign in to comment.