Skip to content

Commit

Permalink
Add storage infos in web (#2317)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng authored Aug 11, 2021
1 parent 3d06bf5 commit 3e308f2
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 26 deletions.
3 changes: 1 addition & 2 deletions mars/deploy/oscar/tests/test_cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ def test_parse_args():
'store_memory': '20M',
'plasma_directory': '/dev/shm',
}
assert app.config['storage']['filesystem'] == {
assert app.config['storage']['disk'] == {
'root_dirs': '/tmp',
'level': 'DISK',
}
7 changes: 3 additions & 4 deletions mars/deploy/oscar/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,14 @@ def parse_args(self, parser, argv, environ=None):
storage_config = self.config['storage'] = self.config.get('storage', {})
backends = storage_config['backends'] = storage_config.get('backends', [])
plasma_config = storage_config['plasma'] = storage_config.get('plasma', {})
filesystem_config = storage_config['filesystem'] = storage_config.get('filesystem', {})
disk_config = storage_config['disk'] = storage_config.get('disk', {})
if 'MARS_CACHE_MEM_SIZE' in environ:
plasma_config['store_memory'] = environ['MARS_CACHE_MEM_SIZE']
if 'MARS_PLASMA_DIRS' in environ:
plasma_config['plasma_directory'] = environ['MARS_PLASMA_DIRS']
if 'MARS_SPILL_DIRS' in environ:
backends.append('filesystem')
filesystem_config['root_dirs'] = environ['MARS_SPILL_DIRS']
filesystem_config['level'] = 'DISK'
backends.append('disk')
disk_config['root_dirs'] = environ['MARS_SPILL_DIRS']

return args

Expand Down
1 change: 1 addition & 0 deletions mars/services/cluster/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ async def set_band_quota_info(self, band_name: str,
async def set_node_disk_info(self, disk_info: List[DiskInfo]):
await self._uploader_ref.set_node_disk_info(disk_info)

@mo.extensible
async def set_band_storage_info(self, band_name: str, storage_info: StorageInfo):
await self._uploader_ref.set_band_storage_info(band_name, storage_info)

Expand Down
45 changes: 42 additions & 3 deletions mars/services/storage/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async def get_spill_keys(self,
return self._spill_strategy[level, band_name].get_spill_keys(size)


class StorageManagerActor(mo.Actor):
class StorageManagerActor(mo.StatelessActor):
"""
Storage manager actor, created only on main process, mainly to setup storage backends
and create all the necessary actors for storage service.
Expand All @@ -308,6 +308,8 @@ def __init__(self,
self._handler_cls = kwargs.pop('storage_handler_cls', StorageHandlerActor)
self._storage_configs = storage_configs
self._all_bands = None
self._cluster_api = None
self._upload_task = None

# params to init and teardown
self._init_params = defaultdict(dict)
Expand All @@ -324,7 +326,7 @@ async def __post_create__(self):
from .handler import StorageHandlerActor

try:
cluster_api = await ClusterAPI.create(self.address)
self._cluster_api = cluster_api = await ClusterAPI.create(self.address)
band_to_slots = await cluster_api.get_bands()
self._all_bands = [band[1] for band in band_to_slots]
except mo.ActorNotExist:
Expand Down Expand Up @@ -355,8 +357,13 @@ async def __post_create__(self):
await self._create_storage_handler_actors()
# create actor for transfer
await self._create_transfer_actors()
await self.upload_disk_info()
# create task for uploading storage usages
self._upload_task = asyncio.create_task(self.upload_storage_info())

async def __pre_destroy__(self):
if self._upload_task:
self._upload_task.cancel()
for _, params in self._teardown_params:
for backend, teardown_params in params.items():
backend_cls = get_storage_backend(backend)
Expand Down Expand Up @@ -457,7 +464,8 @@ async def _create_transfer_actors(self):
address=self.address, allocate_strategy=sender_strategy)

await mo.create_actor(ReceiverManagerActor,
self._quotas, handler_ref,
self._quotas[default_band_name],
handler_ref,
address=self.address,
uid=ReceiverManagerActor.gen_uid(default_band_name),
allocate_strategy=receiver_strategy)
Expand Down Expand Up @@ -493,3 +501,34 @@ async def _setup_storage(self,

def get_client_params(self):
return self._init_params

async def upload_storage_info(self):
from ..cluster import StorageInfo

if self._cluster_api is not None:
while True:
upload_tasks = []
for band, level_to_quota in self._quotas.items():
for level, quota_ref in level_to_quota.items():
total, used = await quota_ref.get_quota()
used = int(used)
if total is not None:
total = int(total)
storage_info = StorageInfo(storage_level=level,
total_size=total,
used_size=used)
upload_tasks.append(
self._cluster_api.set_band_storage_info.delay(band, storage_info))
await self._cluster_api.set_band_storage_info.batch(*upload_tasks)
await asyncio.sleep(0.5)

async def upload_disk_info(self):
from ..cluster import DiskInfo

disk_infos = []
if self._cluster_api is not None and 'disk' in self._init_params['numa-0']:
params = self._init_params['numa-0']['disk']
size = params['size']
for path in params['root_dirs']:
disk_infos.append(DiskInfo(path=path, limit_size=size))
await self._cluster_api.set_node_disk_info(disk_infos)
5 changes: 4 additions & 1 deletion mars/services/storage/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import sys
import tempfile

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -127,6 +128,7 @@ async def test_storage_mock_api(ray_start_regular, storage_configs):
async def test_web_storage_api():
from mars.services.storage.api.web import StorageWebAPIHandler

tempdir = tempfile.mkdtemp()
start_method = 'fork' if sys.platform != 'win32' else None
pool = await mo.create_actor_pool('127.0.0.1', 1,
subprocess_start_method=start_method)
Expand All @@ -143,7 +145,8 @@ async def test_web_storage_api():
await MockStorageAPI.create(
address=pool.external_address,
session_id=session_id,
storage_configs={'shared_memory': dict()})
storage_configs={'shared_memory': dict(),
'disk': dict(root_dirs=[tempdir])})

web_config = {
'port': get_next_port(),
Expand Down
4 changes: 4 additions & 0 deletions mars/services/storage/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ async def open_writers(self,
async def do_write(self, message: TransferMessage):
# close may be a high cost operation, use create_task
close_tasks = []
finished_keys = []
for data, data_key, is_eof in zip(message.data,
message.data_keys,
message.eof_marks):
Expand All @@ -222,7 +223,10 @@ async def do_write(self, message: TransferMessage):
await writer.write(data)
if is_eof:
close_tasks.append(asyncio.create_task(writer.close()))
finished_keys.append(data_key)
await asyncio.gather(*close_tasks)
for data_key in finished_keys:
self._key_to_writer_info.pop((message.session_id, data_key))

async def receive_part_data(self, message: TransferMessage):
try:
Expand Down
3 changes: 3 additions & 0 deletions mars/services/web/ui/src/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ export function toReadableSize(size, trunc) {
let res_size = size;
let size_unit = '';

if (size === null) {
return 'NA';
}
if (size >= 1024 && size < 1024 ** 2) {
res_size = size / 1024.0;
size_unit = 'K';
Expand Down
39 changes: 28 additions & 11 deletions mars/services/web/ui/src/node_info/NodeListPage.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class NodeList extends React.Component {

refreshInfo() {
const roleId = (this.nodeRole === 'supervisor' ? 0 : 1);
fetch(`api/cluster/nodes?role=${roleId}&resource=1&exclude_statuses=-1`)
fetch(`api/cluster/nodes?role=${roleId}&resource=1&detail=1&exclude_statuses=-1`)
.then((res) => res.json())
.then((res) => {
const { state } = this;
Expand Down Expand Up @@ -89,28 +89,45 @@ class NodeList extends React.Component {
return `${reprFun(total - avail)} / ${reprFun(total)}`;
};

const getSharedMemoryInfo = (nodeDetail) => {
const memoryInfo = nodeDetail['numa-0']['memory'];
return`${toReadableSize(memoryInfo['size_used'])} / ${toReadableSize(memoryInfo['size_total'])}`;
};

const generateCells = (endpoint) => {
const rows = [];
rows.push(<TableCell><Link to={`/${this.nodeRole}/${endpoint}`}>{endpoint}</Link></TableCell>);
rows.push(<TableCell>{getNodeStatusText(roleData[endpoint].status)}</TableCell>);
rows.push(<TableCell>{calcNodeStatGroup(roleData[endpoint], 'cpu', (v) => v.toFixed(2))}</TableCell>);
rows.push(<TableCell>{calcNodeStatGroup(roleData[endpoint], 'memory', toReadableSize)}</TableCell>);
if (this.nodeRole === 'worker') {
rows.push(<TableCell>{getSharedMemoryInfo(roleData[endpoint].detail.storage)}</TableCell>);
}
rows.push(<TableCell>{formatTime(roleData[endpoint].update_time)}</TableCell>);
return rows;
};

const roleData = this.state[this.nodeRole];

return (
<Table size="small">
<TableHead>
<TableRow>
<TableCell style={{ fontWeight: 'bolder' }}>Endpoint</TableCell>
<TableCell style={{ fontWeight: 'bolder' }}>Status</TableCell>
<TableCell style={{ fontWeight: 'bolder' }}>CPU</TableCell>
<TableCell style={{ fontWeight: 'bolder' }}>Memory</TableCell>
<TableCell style={{ fontWeight: 'bolder' }}>Update Time</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Endpoint</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Status</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>CPU</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Memory</TableCell>
{this.nodeRole === 'worker' &&
<TableCell style={{fontWeight: 'bolder'}}>Shared Memory</TableCell>
}
<TableCell style={{fontWeight: 'bolder'}}>Update Time</TableCell>
</TableRow>
</TableHead>
<TableBody>
{
Object.keys(roleData).map((endpoint) => (
<TableRow key={`nodeList_${this.nodeRole}_${endpoint}`}>
<TableCell><Link to={`/${this.nodeRole}/${endpoint}`}>{endpoint}</Link></TableCell>
<TableCell>{getNodeStatusText(roleData[endpoint].status)}</TableCell>
<TableCell>{calcNodeStatGroup(roleData[endpoint], 'cpu', (v) => v.toFixed(2))}</TableCell>
<TableCell>{calcNodeStatGroup(roleData[endpoint], 'memory', toReadableSize)}</TableCell>
<TableCell>{formatTime(roleData[endpoint].update_time)}</TableCell>
{generateCells(endpoint)}
</TableRow>
))
}
Expand Down
31 changes: 31 additions & 0 deletions mars/services/web/ui/src/node_info/NodeResourceTab.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,37 @@ export default class NodeResourceTab extends React.Component {
</Table>
</React.Fragment>
}
{Object.keys(this.state.detail.storage).length &&
<React.Fragment>
<Title component="h3">Storage</Title>
<Table>
<TableHead>
<TableRow>
<TableCell style={{fontWeight: 'bolder'}}>Band</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Level</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Value</TableCell>
</TableRow>
</TableHead>
<TableBody>
{Object.keys(this.state.detail.storage).map((band) => (
Object.keys(this.state.detail.storage[band]).map((level) => (
<TableRow key={`${band}-storage`}>
<TableCell>{band}</TableCell>
<TableCell>{level}</TableCell>
<TableCell>
<div>
Used:{toReadableSize(this.state.detail.storage[band][level].size_used)}
</div>
<div>
Total:{toReadableSize(this.state.detail.storage[band][level].size_total)}
</div>
</TableCell>
</TableRow>
))))}
</TableBody>
</Table>
</React.Fragment>
}
</div>
);
}
Expand Down
11 changes: 11 additions & 0 deletions mars/storage/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,14 @@ async def open_writer(self, size=None) -> StorageFileObject:
async def open_reader(self, object_id) -> StorageFileObject:
file = await self._fs.open(object_id, 'rb')
return StorageFileObject(file, file.name)


@register_storage_backend
class DiskStorage(FileSystemStorage):
name = 'disk'

@classmethod
@implements(StorageBackend.setup)
async def setup(cls, **kwargs) -> Tuple[Dict, Dict]:
kwargs['level'] = StorageLevel.DISK
return await super().setup(**kwargs)
9 changes: 4 additions & 5 deletions mars/storage/tests/test_libs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from mars.serialization import AioSerializer, AioDeserializer
from mars.storage.base import StorageLevel
from mars.storage.cuda import CudaStorage
from mars.storage.filesystem import FileSystemStorage
from mars.storage.filesystem import DiskStorage
from mars.storage.plasma import PlasmaStorage
from mars.storage.shared_memory import SharedMemoryStorage
from mars.storage.vineyard import VineyardStorage
Expand Down Expand Up @@ -65,11 +65,10 @@
async def storage_context(ray_start_regular, request):
if request.param == 'filesystem':
tempdir = tempfile.mkdtemp()
params, teardown_params = await FileSystemStorage.setup(
params, teardown_params = await DiskStorage.setup(
fs=LocalFileSystem(),
root_dirs=[tempdir],
level=StorageLevel.DISK)
storage = FileSystemStorage(**params)
root_dirs=[tempdir])
storage = DiskStorage(**params)
assert storage.level == StorageLevel.DISK

yield storage
Expand Down

0 comments on commit 3e308f2

Please sign in to comment.