Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Add storage infos in web (#2317) #2333

Merged
merged 1 commit into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions mars/deploy/oscar/tests/test_cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ def test_parse_args():
'store_memory': '20M',
'plasma_directory': '/dev/shm',
}
assert app.config['storage']['filesystem'] == {
assert app.config['storage']['disk'] == {
'root_dirs': '/tmp',
'level': 'DISK',
}
7 changes: 3 additions & 4 deletions mars/deploy/oscar/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,14 @@ def parse_args(self, parser, argv, environ=None):
storage_config = self.config['storage'] = self.config.get('storage', {})
backends = storage_config['backends'] = storage_config.get('backends', [])
plasma_config = storage_config['plasma'] = storage_config.get('plasma', {})
filesystem_config = storage_config['filesystem'] = storage_config.get('filesystem', {})
disk_config = storage_config['disk'] = storage_config.get('disk', {})
if 'MARS_CACHE_MEM_SIZE' in environ:
plasma_config['store_memory'] = environ['MARS_CACHE_MEM_SIZE']
if 'MARS_PLASMA_DIRS' in environ:
plasma_config['plasma_directory'] = environ['MARS_PLASMA_DIRS']
if 'MARS_SPILL_DIRS' in environ:
backends.append('filesystem')
filesystem_config['root_dirs'] = environ['MARS_SPILL_DIRS']
filesystem_config['level'] = 'DISK'
backends.append('disk')
disk_config['root_dirs'] = environ['MARS_SPILL_DIRS']

return args

Expand Down
1 change: 1 addition & 0 deletions mars/services/cluster/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ async def set_band_quota_info(self, band_name: str,
async def set_node_disk_info(self, disk_info: List[DiskInfo]):
await self._uploader_ref.set_node_disk_info(disk_info)

@mo.extensible
async def set_band_storage_info(self, band_name: str, storage_info: StorageInfo):
await self._uploader_ref.set_band_storage_info(band_name, storage_info)

Expand Down
45 changes: 42 additions & 3 deletions mars/services/storage/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async def get_spill_keys(self,
return self._spill_strategy[level, band_name].get_spill_keys(size)


class StorageManagerActor(mo.Actor):
class StorageManagerActor(mo.StatelessActor):
"""
Storage manager actor, created only on main process, mainly to setup storage backends
and create all the necessary actors for storage service.
Expand All @@ -308,6 +308,8 @@ def __init__(self,
self._handler_cls = kwargs.pop('storage_handler_cls', StorageHandlerActor)
self._storage_configs = storage_configs
self._all_bands = None
self._cluster_api = None
self._upload_task = None

# params to init and teardown
self._init_params = defaultdict(dict)
Expand All @@ -324,7 +326,7 @@ async def __post_create__(self):
from .handler import StorageHandlerActor

try:
cluster_api = await ClusterAPI.create(self.address)
self._cluster_api = cluster_api = await ClusterAPI.create(self.address)
band_to_slots = await cluster_api.get_bands()
self._all_bands = [band[1] for band in band_to_slots]
except mo.ActorNotExist:
Expand Down Expand Up @@ -355,8 +357,13 @@ async def __post_create__(self):
await self._create_storage_handler_actors()
# create actor for transfer
await self._create_transfer_actors()
await self.upload_disk_info()
# create task for uploading storage usages
self._upload_task = asyncio.create_task(self.upload_storage_info())

async def __pre_destroy__(self):
if self._upload_task:
self._upload_task.cancel()
for _, params in self._teardown_params:
for backend, teardown_params in params.items():
backend_cls = get_storage_backend(backend)
Expand Down Expand Up @@ -457,7 +464,8 @@ async def _create_transfer_actors(self):
address=self.address, allocate_strategy=sender_strategy)

await mo.create_actor(ReceiverManagerActor,
self._quotas, handler_ref,
self._quotas[default_band_name],
handler_ref,
address=self.address,
uid=ReceiverManagerActor.gen_uid(default_band_name),
allocate_strategy=receiver_strategy)
Expand Down Expand Up @@ -493,3 +501,34 @@ async def _setup_storage(self,

def get_client_params(self):
return self._init_params

async def upload_storage_info(self):
from ..cluster import StorageInfo

if self._cluster_api is not None:
while True:
upload_tasks = []
for band, level_to_quota in self._quotas.items():
for level, quota_ref in level_to_quota.items():
total, used = await quota_ref.get_quota()
used = int(used)
if total is not None:
total = int(total)
storage_info = StorageInfo(storage_level=level,
total_size=total,
used_size=used)
upload_tasks.append(
self._cluster_api.set_band_storage_info.delay(band, storage_info))
await self._cluster_api.set_band_storage_info.batch(*upload_tasks)
await asyncio.sleep(0.5)

async def upload_disk_info(self):
from ..cluster import DiskInfo

disk_infos = []
if self._cluster_api is not None and 'disk' in self._init_params['numa-0']:
params = self._init_params['numa-0']['disk']
size = params['size']
for path in params['root_dirs']:
disk_infos.append(DiskInfo(path=path, limit_size=size))
await self._cluster_api.set_node_disk_info(disk_infos)
5 changes: 4 additions & 1 deletion mars/services/storage/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import sys
import tempfile

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -127,6 +128,7 @@ async def test_storage_mock_api(ray_start_regular, storage_configs):
async def test_web_storage_api():
from mars.services.storage.api.web import StorageWebAPIHandler

tempdir = tempfile.mkdtemp()
start_method = 'fork' if sys.platform != 'win32' else None
pool = await mo.create_actor_pool('127.0.0.1', 1,
subprocess_start_method=start_method)
Expand All @@ -143,7 +145,8 @@ async def test_web_storage_api():
await MockStorageAPI.create(
address=pool.external_address,
session_id=session_id,
storage_configs={'shared_memory': dict()})
storage_configs={'shared_memory': dict(),
'disk': dict(root_dirs=[tempdir])})

web_config = {
'port': get_next_port(),
Expand Down
4 changes: 4 additions & 0 deletions mars/services/storage/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ async def open_writers(self,
async def do_write(self, message: TransferMessage):
# close may be a high cost operation, use create_task
close_tasks = []
finished_keys = []
for data, data_key, is_eof in zip(message.data,
message.data_keys,
message.eof_marks):
Expand All @@ -222,7 +223,10 @@ async def do_write(self, message: TransferMessage):
await writer.write(data)
if is_eof:
close_tasks.append(asyncio.create_task(writer.close()))
finished_keys.append(data_key)
await asyncio.gather(*close_tasks)
for data_key in finished_keys:
self._key_to_writer_info.pop((message.session_id, data_key))

async def receive_part_data(self, message: TransferMessage):
try:
Expand Down
3 changes: 3 additions & 0 deletions mars/services/web/ui/src/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ export function toReadableSize(size, trunc) {
let res_size = size;
let size_unit = '';

if (size === null) {
return 'NA';
}
if (size >= 1024 && size < 1024 ** 2) {
res_size = size / 1024.0;
size_unit = 'K';
Expand Down
39 changes: 28 additions & 11 deletions mars/services/web/ui/src/node_info/NodeListPage.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class NodeList extends React.Component {

refreshInfo() {
const roleId = (this.nodeRole === 'supervisor' ? 0 : 1);
fetch(`api/cluster/nodes?role=${roleId}&resource=1&exclude_statuses=-1`)
fetch(`api/cluster/nodes?role=${roleId}&resource=1&detail=1&exclude_statuses=-1`)
.then((res) => res.json())
.then((res) => {
const { state } = this;
Expand Down Expand Up @@ -89,28 +89,45 @@ class NodeList extends React.Component {
return `${reprFun(total - avail)} / ${reprFun(total)}`;
};

const getSharedMemoryInfo = (nodeDetail) => {
const memoryInfo = nodeDetail['numa-0']['memory'];
return`${toReadableSize(memoryInfo['size_used'])} / ${toReadableSize(memoryInfo['size_total'])}`;
};

const generateCells = (endpoint) => {
const rows = [];
rows.push(<TableCell><Link to={`/${this.nodeRole}/${endpoint}`}>{endpoint}</Link></TableCell>);
rows.push(<TableCell>{getNodeStatusText(roleData[endpoint].status)}</TableCell>);
rows.push(<TableCell>{calcNodeStatGroup(roleData[endpoint], 'cpu', (v) => v.toFixed(2))}</TableCell>);
rows.push(<TableCell>{calcNodeStatGroup(roleData[endpoint], 'memory', toReadableSize)}</TableCell>);
if (this.nodeRole === 'worker') {
rows.push(<TableCell>{getSharedMemoryInfo(roleData[endpoint].detail.storage)}</TableCell>);
}
rows.push(<TableCell>{formatTime(roleData[endpoint].update_time)}</TableCell>);
return rows;
};

const roleData = this.state[this.nodeRole];

return (
<Table size="small">
<TableHead>
<TableRow>
<TableCell style={{ fontWeight: 'bolder' }}>Endpoint</TableCell>
<TableCell style={{ fontWeight: 'bolder' }}>Status</TableCell>
<TableCell style={{ fontWeight: 'bolder' }}>CPU</TableCell>
<TableCell style={{ fontWeight: 'bolder' }}>Memory</TableCell>
<TableCell style={{ fontWeight: 'bolder' }}>Update Time</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Endpoint</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Status</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>CPU</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Memory</TableCell>
{this.nodeRole === 'worker' &&
<TableCell style={{fontWeight: 'bolder'}}>Shared Memory</TableCell>
}
<TableCell style={{fontWeight: 'bolder'}}>Update Time</TableCell>
</TableRow>
</TableHead>
<TableBody>
{
Object.keys(roleData).map((endpoint) => (
<TableRow key={`nodeList_${this.nodeRole}_${endpoint}`}>
<TableCell><Link to={`/${this.nodeRole}/${endpoint}`}>{endpoint}</Link></TableCell>
<TableCell>{getNodeStatusText(roleData[endpoint].status)}</TableCell>
<TableCell>{calcNodeStatGroup(roleData[endpoint], 'cpu', (v) => v.toFixed(2))}</TableCell>
<TableCell>{calcNodeStatGroup(roleData[endpoint], 'memory', toReadableSize)}</TableCell>
<TableCell>{formatTime(roleData[endpoint].update_time)}</TableCell>
{generateCells(endpoint)}
</TableRow>
))
}
Expand Down
31 changes: 31 additions & 0 deletions mars/services/web/ui/src/node_info/NodeResourceTab.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,37 @@ export default class NodeResourceTab extends React.Component {
</Table>
</React.Fragment>
}
{Object.keys(this.state.detail.storage).length &&
<React.Fragment>
<Title component="h3">Storage</Title>
<Table>
<TableHead>
<TableRow>
<TableCell style={{fontWeight: 'bolder'}}>Band</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Level</TableCell>
<TableCell style={{fontWeight: 'bolder'}}>Value</TableCell>
</TableRow>
</TableHead>
<TableBody>
{Object.keys(this.state.detail.storage).map((band) => (
Object.keys(this.state.detail.storage[band]).map((level) => (
<TableRow key={`${band}-storage`}>
<TableCell>{band}</TableCell>
<TableCell>{level}</TableCell>
<TableCell>
<div>
Used:{toReadableSize(this.state.detail.storage[band][level].size_used)}
</div>
<div>
Total:{toReadableSize(this.state.detail.storage[band][level].size_total)}
</div>
</TableCell>
</TableRow>
))))}
</TableBody>
</Table>
</React.Fragment>
}
</div>
);
}
Expand Down
11 changes: 11 additions & 0 deletions mars/storage/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,14 @@ async def open_writer(self, size=None) -> StorageFileObject:
async def open_reader(self, object_id) -> StorageFileObject:
file = await self._fs.open(object_id, 'rb')
return StorageFileObject(file, file.name)


@register_storage_backend
class DiskStorage(FileSystemStorage):
name = 'disk'

@classmethod
@implements(StorageBackend.setup)
async def setup(cls, **kwargs) -> Tuple[Dict, Dict]:
kwargs['level'] = StorageLevel.DISK
return await super().setup(**kwargs)
9 changes: 4 additions & 5 deletions mars/storage/tests/test_libs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from mars.serialization import AioSerializer, AioDeserializer
from mars.storage.base import StorageLevel
from mars.storage.cuda import CudaStorage
from mars.storage.filesystem import FileSystemStorage
from mars.storage.filesystem import DiskStorage
from mars.storage.plasma import PlasmaStorage
from mars.storage.shared_memory import SharedMemoryStorage
from mars.storage.vineyard import VineyardStorage
Expand Down Expand Up @@ -65,11 +65,10 @@
async def storage_context(ray_start_regular, request):
if request.param == 'filesystem':
tempdir = tempfile.mkdtemp()
params, teardown_params = await FileSystemStorage.setup(
params, teardown_params = await DiskStorage.setup(
fs=LocalFileSystem(),
root_dirs=[tempdir],
level=StorageLevel.DISK)
storage = FileSystemStorage(**params)
root_dirs=[tempdir])
storage = DiskStorage(**params)
assert storage.level == StorageLevel.DISK

yield storage
Expand Down