Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mutable tensor on oscar #2432

Merged
merged 49 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
756f079
feat: split chunks and assign
Coco58323 Aug 28, 2021
c8d8c33
feat:read or writa single index of tensor
Sep 6, 2021
6e851c3
refactor: add service for mutable_tensor
Coco58323 Sep 6, 2021
eaaba46
test: tensor create, read/write for single index
Coco58323 Sep 6, 2021
e2b1901
fix: worker address;feat: get mutable tensor
Coco58323 Sep 8, 2021
f5a2267
style
Sep 9, 2021
c62d13e
style
Coco58323 Sep 9, 2021
8e74ae0
style;fix
Coco58323 Sep 9, 2021
ff2b4f7
resolve conflict
Coco58323 Sep 9, 2021
d4a3875
style
Coco58323 Sep 10, 2021
a6ae305
Merge branch 'master' into master
Coco58323 Sep 10, 2021
fd6ce49
style
Coco58323 Sep 10, 2021
64725fb
skip windows
Coco58323 Sep 10, 2021
4c53152
don't let pass no cover
Coco58323 Sep 10, 2021
d047b92
fix: api from session to mutable
Coco58323 Sep 17, 2021
55ae591
style
Sep 17, 2021
18a763e
perf:client -> worker
Sep 17, 2021
20ac8c7
feat:utilize storage_api
Coco58323 Sep 18, 2021
790fd81
style
Coco58323 Sep 18, 2021
25c249e
feat:support version
Coco58323 Sep 19, 2021
46f5b6a
style
Coco58323 Sep 19, 2021
fe3ad64
fix: lack of web
Coco58323 Sep 22, 2021
bb30c00
remove mutable from session api
Coco58323 Sep 25, 2021
e7907dd
style
Coco58323 Sep 26, 2021
534659f
style
Coco58323 Sep 26, 2021
82672d5
seal test
Coco58323 Sep 26, 2021
e5da351
feat: web mutable api
Coco58323 Sep 29, 2021
014f3b6
style
Coco58323 Sep 29, 2021
31f87fe
style
Coco58323 Sep 29, 2021
caa3406
style
Coco58323 Sep 29, 2021
aa1177e
style
Coco58323 Sep 29, 2021
fe73f1a
fix: WebMutableAPI(session_id, address)
Coco58323 Sep 29, 2021
af517e9
Fix codacy & refine
Oct 9, 2021
8712bff
Merge remote-tracking branch 'upstream/master' into coco-master
Oct 9, 2021
b759510
Refactor the MutableTensor PR.
sighingnow Oct 10, 2021
7ef0fde
Fixes serialization on WebSession.
sighingnow Oct 11, 2021
d7ba913
Fixes webapi test.
sighingnow Oct 11, 2021
973c4e7
Fixes.
sighingnow Oct 11, 2021
b85975d
Fixes flake8.
sighingnow Oct 11, 2021
bb839bc
Fixes lint and improve coverage.
sighingnow Oct 11, 2021
603cb86
Rework mutable tensor.
sighingnow Oct 15, 2021
1bbe13b
Timeout on seal, and docs.
sighingnow Oct 16, 2021
759e4f7
Fixes.
sighingnow Oct 16, 2021
afceab3
Reduce size of test tenosr.
sighingnow Oct 16, 2021
07c6245
Fixes.
sighingnow Oct 16, 2021
c850639
Update.
sighingnow Oct 16, 2021
249d90d
Fixes syntax error.
sighingnow Oct 16, 2021
13e30c6
Add MockMutableAPI, and more covarge on read/write with timestamps.
sighingnow Oct 16, 2021
fe467ad
Fixes.
sighingnow Oct 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mars/deploy/oscar/base_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
- scheduling
- subtask
- task
- mutable
cluster:
backend: fixed
node_timeout: 120
Expand Down
159 changes: 157 additions & 2 deletions mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from typing import Any, Callable, Coroutine, Dict, List, \
Optional, Tuple, Type, Union

import numpy as np

from ... import oscar as mo
from ...config import options
from ...core import ChunkType, TileableType, TileableGraph, enter_mode
Expand All @@ -41,6 +43,7 @@
from ...services.lifecycle import AbstractLifecycleAPI, LifecycleAPI
from ...services.meta import MetaAPI, AbstractMetaAPI
from ...services.session import AbstractSessionAPI, SessionAPI
from ...services.mutable import MutableAPI, MutableTensor
from ...services.storage import StorageAPI
from ...services.task import AbstractTaskAPI, TaskAPI, TaskResult
from ...services.web import OscarWebAPI
Expand Down Expand Up @@ -345,6 +348,53 @@ async def destroy_remote_object(self,
name : str
"""

@abstractmethod
async def create_mutable_tensor(self,
shape: tuple,
dtype: Union[np.dtype, str],
name: str = None,
default_value: Union[int, float] = 0,
chunk_size: Union[int, Tuple] = None):
"""
Create a mutable tensor.

Parameters
----------
shape: tuple
Shape of the mutable tensor.

dtype: np.dtype or str
Data type of the mutable tensor.

name: str, optional
Name of the mutable tensor, a random name will be used if not specified.

default_value: optional
Default value of the mutable tensor. Default is 0.

chunk_size: int or tuple, optional
Chunk size of the mutable tensor.

Returns
-------
MutableTensor
"""

@abstractmethod
async def get_mutable_tensor(self, name: str):
"""
Get a mutable tensor by name.

Parameters
----------
name: str
Name of the mutable tensor to get.

Returns
-------
MutableTensor
"""

async def stop_server(self):
"""
Stop server.
Expand Down Expand Up @@ -513,6 +563,53 @@ def get_web_endpoint(self) -> Optional[str]:
web endpoint
"""

@abstractmethod
def create_mutable_tensor(self,
shape: tuple,
dtype: Union[np.dtype, str],
name: str = None,
default_value: Union[int, float] = 0,
chunk_size: Union[int, Tuple] = None):
"""
Create a mutable tensor.

Parameters
----------
shape: tuple
Shape of the mutable tensor.

dtype: np.dtype or str
Data type of the mutable tensor.

name: str, optional
Name of the mutable tensor, a random name will be used if not specified.

default_value: optional
Default value of the mutable tensor. Default is 0.

chunk_size: int or tuple, optional
Chunk size of the mutable tensor.

Returns
-------
MutableTensor
"""

@abstractmethod
def get_mutable_tensor(self, name: str):
"""
Get a mutable tensor by name.

Parameters
----------
name: str
Name of the mutable tensor to get.

Returns
-------
MutableTensor
"""

def fetch_log(self,
tileables: List[TileableType],
offsets: List[int] = None,
Expand Down Expand Up @@ -608,6 +705,7 @@ def __init__(self,
meta_api: AbstractMetaAPI,
lifecycle_api: AbstractLifecycleAPI,
task_api: AbstractTaskAPI,
mutable_api: MutableAPI,
cluster_api: AbstractClusterAPI,
web_api: Optional[OscarWebAPI],
client: ClientType = None,
Expand All @@ -617,6 +715,7 @@ def __init__(self,
self._task_api = task_api
self._meta_api = meta_api
self._lifecycle_api = lifecycle_api
self._mutable_api = mutable_api
self._cluster_api = cluster_api
self._web_api = web_api
self.client = client
Expand All @@ -641,14 +740,15 @@ async def _init(cls,
lifecycle_api = await LifecycleAPI.create(session_id, session_address)
meta_api = await MetaAPI.create(session_id, session_address)
task_api = await TaskAPI.create(session_id, session_address)
mutable_api = await MutableAPI.create(session_id, session_address)
cluster_api = await ClusterAPI.create(session_address)
try:
web_api = await OscarWebAPI.create(session_address)
except mo.ActorNotExist:
web_api = None
return cls(address, session_id,
session_api, meta_api,
lifecycle_api, task_api,
lifecycle_api, task_api, mutable_api,
cluster_api, web_api,
timeout=timeout)

Expand Down Expand Up @@ -1016,6 +1116,20 @@ async def destroy_remote_object(self,
name: str):
return await self._session_api.destroy_remote_object(session_id, name)

async def create_mutable_tensor(self,
shape: tuple,
dtype: Union[np.dtype, str],
name: str = None,
default_value: Union[int, float] = 0,
chunk_size: Union[int, Tuple] = None):
tensor_info = await self._mutable_api.create_mutable_tensor(
shape, dtype, name, default_value, chunk_size)
return tensor_info, self._mutable_api

async def get_mutable_tensor(self, name: str):
tensor_info = await self._mutable_api.get_mutable_tensor(name)
return tensor_info, self._mutable_api

async def stop_server(self):
if self.client:
await self.client.stop()
Expand All @@ -1032,6 +1146,7 @@ async def _init(cls,
from ...services.lifecycle import WebLifecycleAPI
from ...services.meta import WebMetaAPI
from ...services.task import WebTaskAPI
from ...services.mutable import WebMutableAPI
from ...services.cluster import WebClusterAPI

session_api = WebSessionAPI(address)
Expand All @@ -1043,12 +1158,13 @@ async def _init(cls,
lifecycle_api = WebLifecycleAPI(session_id, address)
meta_api = WebMetaAPI(session_id, address)
task_api = WebTaskAPI(session_id, address)
mutable_api = WebMutableAPI(session_id, address)
cluster_api = WebClusterAPI(address)

return cls(address, session_id,
session_api, meta_api,
lifecycle_api, task_api,
cluster_api, None, timeout=timeout)
mutable_api, cluster_api, None, timeout=timeout)

async def get_web_endpoint(self) -> Optional[str]:
return self.address
Expand Down Expand Up @@ -1184,6 +1300,25 @@ async def destroy_remote_object(self,
name: str):
pass # pragma: no cover

@implements(AbstractAsyncSession.create_mutable_tensor)
async def create_mutable_tensor(self,
shape: tuple,
dtype: Union[np.dtype, str],
name: str = None,
default_value: Union[int, float] = 0,
chunk_size: Union[int, Tuple] = None):
tensor_info, mutable_api = \
await self._isolated_session.create_mutable_tensor(
shape, dtype, name, default_value, chunk_size)
return MutableTensor.create(tensor_info, mutable_api, self._loop)

@implements(AbstractAsyncSession.get_mutable_tensor)
async def get_mutable_tensor(self,
name: str):
tensor_info, mutable_api = \
await self._isolated_session.get_mutable_tensor(name)
return MutableTensor.create(tensor_info, mutable_api, self._loop)

@implements(AbstractAsyncSession.get_web_endpoint)
@_delegate_to_isolated_session
async def get_web_endpoint(self) -> Optional[str]:
Expand Down Expand Up @@ -1377,6 +1512,26 @@ def get_web_endpoint(self) -> Optional[str]:
def get_cluster_versions(self) -> List[str]:
pass # pragma: no cover

@implements(AbstractSyncSession.create_mutable_tensor)
def create_mutable_tensor(self,
shape: tuple,
dtype: Union[np.dtype, str],
name: str = None,
default_value: Union[int, float] = 0,
chunk_size: Union[int, Tuple] = None):
coro = self._isolated_session.create_mutable_tensor(
shape, dtype, name, default_value, chunk_size)
fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
tensor_info, mutable_api = fut.result()
return MutableTensor.create(tensor_info, mutable_api, self._loop)

@implements(AbstractSyncSession.get_mutable_tensor)
def get_mutable_tensor(self, name: str):
coro = self._isolated_session.get_mutable_tensor(name)
fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
tensor_info, mutable_api = fut.result()
return MutableTensor.create(tensor_info, mutable_api, self._loop)

def destroy(self):
coro = self._isolated_session.destroy()
asyncio.run_coroutine_threadsafe(coro, self._loop).result()
Expand Down
17 changes: 17 additions & 0 deletions mars/services/mutable/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .api import AbstractMutableAPI, MutableAPI, MockMutableAPI, WebMutableAPI
from .core import MutableTensor
from .supervisor import MutableObjectManagerActor, MutableTensorActor
17 changes: 17 additions & 0 deletions mars/services/mutable/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .core import AbstractMutableAPI
from .oscar import MutableAPI, MockMutableAPI
from .web import WebMutableAPI
Loading