Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Upgrade azure-pipelines to Python 3.9 (#2862) #2886

Merged
merged 1 commit into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/docker-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
shell: bash
env:
DOCKER_ORG: ${{ secrets.DOCKERHUB_USERNAME }}
if: ${{ github.repository == 'mars-project/mars' }}
run: |
source ./ci/reload-env.sh

Expand Down
4 changes: 2 additions & 2 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
mars.test.module: 'tensor'

variables:
PYTHON: '3.8'
PYTHON: '3.9'

steps:
- powershell: |
Expand Down Expand Up @@ -120,7 +120,7 @@ jobs:
vmImage: 'ubuntu-latest'

variables:
PYTHON: '3.8'
PYTHON: '3.9'

steps:
- bash: |
Expand Down
65 changes: 52 additions & 13 deletions mars/core/entity/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# limitations under the License.

import asyncio
import atexit
import concurrent.futures
import queue
import threading
from typing import List
from weakref import WeakKeyDictionary, ref
Expand All @@ -23,7 +25,55 @@
from ..mode import enter_mode


_decref_pool = concurrent.futures.ThreadPoolExecutor()
class DecrefRunner:
def __init__(self):
self._decref_thread = None
self._queue = queue.Queue()

def start(self):
self._decref_thread = threading.Thread(
target=self._thread_body, name="DecrefThread"
)
self._decref_thread.daemon = True
self._decref_thread.start()

def _thread_body(self):
from ...deploy.oscar.session import SyncSession

while True:
key, session_ref, fut = self._queue.get()
if key is None:
break

session = session_ref()
if session is None:
fut.set_result(None)
continue
try:
s = SyncSession.from_isolated_session(session)
s.decref(key)
fut.set_result(None)
except (RuntimeError, ConnectionError, KeyError):
fut.set_result(None)
except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except
fut.set_exception(ex)

def stop(self):
if self._decref_thread: # pragma: no branch
self._queue.put_nowait((None, None, None))
self._decref_thread.join(1)

def put(self, key: str, session_ref: ref):
if self._decref_thread is None:
self.start()

fut = concurrent.futures.Future()
self._queue.put_nowait((key, session_ref, fut))
return fut


_decref_runner = DecrefRunner()
atexit.register(_decref_runner.stop)


class _TileableSession:
Expand All @@ -38,18 +88,7 @@ def cb(_, sess=ref(session)):
# isolation destroyed, no need to decref
return

def decref():
from ...deploy.oscar.session import SyncSession

s = sess()
if s:
try:
s = SyncSession.from_isolated_session(s)
s.decref(key)
except (RuntimeError, ConnectionError, KeyError):
pass

fut = _decref_pool.submit(decref)
fut = _decref_runner.put(key, sess)
if not decref_in_isolation:
# if decref in isolation, means that this tileable
# is not required for main thread, thus we do not need
Expand Down
4 changes: 3 additions & 1 deletion mars/core/graph/tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def test_to_dot():

dot = str(graph.to_dot(trunc_key=5))
try:
assert all(str(n.op.key)[5] in dot for n in graph) is True
assert all(str(n.key)[5] in dot for n in graph) is True
except AssertionError:
graph_reprs = []
for n in graph:
Expand All @@ -117,4 +117,6 @@ def test_to_dot():
dot,
"\n".join(graph_reprs),
)
missing_prefix = next(str(n.key)[5] not in dot for n in graph)
logging.error("Missing prefix %s", missing_prefix)
raise
4 changes: 3 additions & 1 deletion mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1769,8 +1769,10 @@ def _attach_session(future: asyncio.Future):
execution_info, progress_bar, progress_update_interval, cancelled
)
else:
exec_task = asyncio.ensure_future(execution_info)
cancel_task = asyncio.ensure_future(cancelled.wait())
await asyncio.wait(
[execution_info, cancelled.wait()], return_when=asyncio.FIRST_COMPLETED
[exec_task, cancel_task], return_when=asyncio.FIRST_COMPLETED
)
if cancelled.is_set():
execution_info.remove_done_callback(_attach_session)
Expand Down
2 changes: 0 additions & 2 deletions mars/oscar/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.

import asyncio
import functools
import inspect
import textwrap
from collections import namedtuple
Expand Down Expand Up @@ -198,7 +197,6 @@ def batch(self, func: Callable):
self.batch_func = func
return self

@functools.lru_cache(1000)
def __get__(self, instance, owner):
if instance is None:
# calling from class
Expand Down
105 changes: 52 additions & 53 deletions mars/services/cluster/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,42 +316,43 @@ async def create(cls: Type[APIType], address: str, **kw) -> APIType:
from ..supervisor.node_info import NodeInfoCollectorActor
from ..uploader import NodeInfoUploaderActor

create_actor_coros = [
mo.create_actor(
SupervisorPeerLocatorActor,
"fixed",
address,
uid=SupervisorPeerLocatorActor.default_uid(),
address=address,
),
mo.create_actor(
NodeInfoCollectorActor,
uid=NodeInfoCollectorActor.default_uid(),
address=address,
),
mo.create_actor(
NodeAllocatorActor,
"fixed",
address,
uid=NodeAllocatorActor.default_uid(),
address=address,
),
mo.create_actor(
NodeInfoUploaderActor,
NodeRole.WORKER,
interval=kw.get("upload_interval"),
band_to_slots=kw.get("band_to_slots"),
use_gpu=kw.get("use_gpu", False),
uid=NodeInfoUploaderActor.default_uid(),
address=address,
),
mo.create_actor(
ProcessInfoManagerActor,
uid=ProcessInfoManagerActor.default_uid(),
address=address,
),
]
dones, _ = await asyncio.wait(
[
mo.create_actor(
SupervisorPeerLocatorActor,
"fixed",
address,
uid=SupervisorPeerLocatorActor.default_uid(),
address=address,
),
mo.create_actor(
NodeInfoCollectorActor,
uid=NodeInfoCollectorActor.default_uid(),
address=address,
),
mo.create_actor(
NodeAllocatorActor,
"fixed",
address,
uid=NodeAllocatorActor.default_uid(),
address=address,
),
mo.create_actor(
NodeInfoUploaderActor,
NodeRole.WORKER,
interval=kw.get("upload_interval"),
band_to_slots=kw.get("band_to_slots"),
use_gpu=kw.get("use_gpu", False),
uid=NodeInfoUploaderActor.default_uid(),
address=address,
),
mo.create_actor(
ProcessInfoManagerActor,
uid=ProcessInfoManagerActor.default_uid(),
address=address,
),
]
[asyncio.ensure_future(coro) for coro in create_actor_coros]
)

for task in dones:
Expand All @@ -370,22 +371,20 @@ async def cleanup(cls, address: str):
from ..uploader import NodeInfoUploaderActor
from ..supervisor.node_info import NodeInfoCollectorActor

await asyncio.wait(
[
mo.destroy_actor(
mo.create_actor_ref(
uid=SupervisorPeerLocatorActor.default_uid(), address=address
)
),
mo.destroy_actor(
mo.create_actor_ref(
uid=NodeInfoCollectorActor.default_uid(), address=address
)
),
mo.destroy_actor(
mo.create_actor_ref(
uid=NodeInfoUploaderActor.default_uid(), address=address
)
),
]
await asyncio.gather(
mo.destroy_actor(
mo.create_actor_ref(
uid=SupervisorPeerLocatorActor.default_uid(), address=address
)
),
mo.destroy_actor(
mo.create_actor_ref(
uid=NodeInfoCollectorActor.default_uid(), address=address
)
),
mo.destroy_actor(
mo.create_actor_ref(
uid=NodeInfoUploaderActor.default_uid(), address=address
)
),
)
7 changes: 4 additions & 3 deletions mars/services/scheduling/supervisor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,13 @@ async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = Tr
band_tasks[band] += 1

if band_tasks:
coros = []
tasks = []
for band, subtask_count in band_tasks.items():
coros.append(
task = asyncio.ensure_future(
self._queueing_ref.submit_subtasks.tell(band, subtask_count)
)
await asyncio.wait(coros)
tasks.append(task)
await asyncio.wait(tasks)

def _get_subtasks_by_ids(self, subtask_ids: List[str]) -> List[Optional[Subtask]]:
subtasks = []
Expand Down
3 changes: 2 additions & 1 deletion mars/services/scheduling/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ async def redirect_subtask_errors(actor: mo.Actor, subtasks):
)
)
)
await asyncio.wait(coros)
tasks = [asyncio.ensure_future(coro) for coro in coros]
await asyncio.wait(tasks)
raise
2 changes: 1 addition & 1 deletion mars/services/subtask/worker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from ....utils import get_chunk_key_to_data_keys
from ...context import ThreadedServiceContext
from ...meta.api import MetaAPI
from ...storage import StorageAPI
from ...session import SessionAPI
from ...storage import StorageAPI
from ...task import TaskAPI, task_options
from ..core import Subtask, SubtaskStatus, SubtaskResult

Expand Down
2 changes: 1 addition & 1 deletion mars/services/subtask/worker/tests/test_subtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def sleep(timeout: int):
with Timer() as timer:
# normal cancel by cancel asyncio Task
aio_task = asyncio.create_task(
asyncio.wait_for(subtask_runner.cancel_subtask(), timeout=1)
asyncio.wait_for(asyncio.shield(subtask_runner.cancel_subtask()), timeout=1)
)
assert await subtask_runner.is_runner_free() is False
with pytest.raises(asyncio.TimeoutError):
Expand Down
3 changes: 2 additions & 1 deletion mars/services/task/supervisor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ async def start(self):

async def wait(self, timeout: int = None):
fs = [
processor.done.wait() for processor in self._task_id_to_processor.values()
asyncio.ensure_future(processor.done.wait())
for processor in self._task_id_to_processor.values()
]

_, pending = yield asyncio.wait(fs, timeout=timeout)
Expand Down