Skip to content

Commit

Permalink
Fix respawning of pool instances have different names (#2493)
Browse files Browse the repository at this point in the history
* Using current names

* enhancing test

* Update tests/test_pool.py

* empty commit to trigger CICD

* removing name argument since it is not used by wrapped functions

* Using a clear option for thread name.
Getting back the name/thread_name argument to pool_monitor_thread

* empty commit

* Avoiding timeout

* Increasing timeout and unmarking test

* disabling rerun

* Adding name to spawn_mapdl

* Adding more checks while spawning instances

* forcing override (deleting lock file)

* fixing conftest requirement for windows libraries
  • Loading branch information
germa89 authored Jan 4, 2024
1 parent f43c3a6 commit 04aae4f
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 19 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ env:
DOCKER_PACKAGE: ghcr.io/ansys/mapdl
DOCKER_IMAGE_VERSION_DOCS_BUILD: v24.1-ubuntu-student
ON_CI: True
PYTEST_ARGUMENTS: '-vv --durations=10 --maxfail=2 --reruns 2 --reruns-delay 1 --cov=ansys.mapdl.core --cov-report=html'
PYTEST_ARGUMENTS: '-vv --durations=10 --maxfail=3 --reruns 3 --reruns-delay 4 --cov=ansys.mapdl.core --cov-report=html'

# Following env vars when changed will "reset" the mentioned cache,
# by changing the cache file name. It is rendered as ...-v%RESET_XXX%-...
Expand Down Expand Up @@ -496,6 +496,7 @@ jobs:
P_SCHEMA: "/ansys_inc/v241/ansys/ac4/schema"
PYTEST_TIMEOUT: 120 # seconds. Limit the duration for each unit test


steps:
- name: "Install Git and checkout project"
uses: actions/checkout@v4
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ tests = [
"vtk==9.3.0",
"pytest-rerunfailures==13.0",
"pytest-pyvista==0.1.9",
"pytest-timeout==2.2.0",
]
doc = [
"sphinx==7.2.6",
Expand Down
4 changes: 3 additions & 1 deletion src/ansys/mapdl/core/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,9 @@ def threaded_daemon(func):

@wraps(func)
def wrapper(*args, **kwargs):
name = kwargs.get("name", f"Threaded (with Daemon) `{func.__name__}` function")
name = kwargs.pop(
"thread_name", f"Threaded (with Daemon) `{func.__name__}` function"
)
thread = Thread(target=func, name=name, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
Expand Down
36 changes: 27 additions & 9 deletions src/ansys/mapdl/core/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ def __init__(

# threaded spawn
threads = [
self._spawn_mapdl(i, ports[i], pbar, name=self._names(i))
self._spawn_mapdl(
i, ports[i], pbar, name=self._names(i), thread_name=self._names(i)
)
for i in range(n_instances)
]
if wait:
Expand All @@ -258,7 +260,10 @@ def __init__(

# monitor pool if requested
if restart_failed:
self._pool_monitor_thread = self._monitor_pool(name="Monitoring_Thread")
# This name is using the wrapped to specify the thread name
self._pool_monitor_thread = self._monitor_pool(
thread_name="Monitoring_Thread"
)

self._verify_unique_ports()

Expand Down Expand Up @@ -383,12 +388,12 @@ def map(
pbar = tqdm(total=n, desc="MAPDL Running")

@threaded_daemon
def func_wrapper(obj, func, timeout, args=None, name=""):
def func_wrapper(obj, func, timeout, args=None):
"""Expect obj to be an instance of Mapdl"""
complete = [False]

@threaded_daemon
def run(name=""):
def run():
if args is not None:
if isinstance(args, (tuple, list)):
results.append(func(obj, *args))
Expand All @@ -398,7 +403,7 @@ def run(name=""):
results.append(func(obj))
complete[0] = True

run_thread = run(name="map.run")
run_thread = run(thread_name="map.run")
if timeout:
tstart = time.time()
while not complete[0]:
Expand Down Expand Up @@ -443,7 +448,9 @@ def run(name=""):
instance = self.next_available()
instance.locked = True
threads.append(
func_wrapper(instance, func, timeout, args, name="Map_Thread")
func_wrapper(
instance, func, timeout, args, thread_name="Map_Thread"
)
)

if close_when_finished:
Expand Down Expand Up @@ -692,7 +699,7 @@ def _spawn_mapdl(
self._instances[index] = launch_mapdl(
run_location=run_location,
port=port,
override=self._override,
override=True,
**self._spawn_kwargs,
)

Expand All @@ -701,29 +708,40 @@ def _spawn_mapdl(
while self._instances[index] is None:
time.sleep(0.1)

assert not self._instances[index].exited
self._instances[index].prep7()

# LOG.debug("Spawned instance %d. Name '%s'", index, name)
if pbar is not None:
pbar.update(1)

self._spawning_i -= 1

@threaded_daemon
def _monitor_pool(self, refresh=1.0, name=""):
def _monitor_pool(self, refresh=1.0):
"""Checks if instances within a pool have exited (failed) and
restarts them.
"""
while self._active:
for index, instance in enumerate(self._instances):
name = self._names[index]
if not instance: # encountered placeholder
continue

if instance._exited:
try:
# use the next port after the current available port
self._spawning_i += 1
port = max(self._ports) + 1
self._spawn_mapdl(
index, port=port, name=f"Instance {index}"
index,
port=port,
name=name,
thread_name=name,
).join()

except Exception as e:
LOG.error(e, exc_info=True)
self._spawning_i -= 1
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@

def has_dependency(requirement):
try:
if os.name == "nt":
requirement = requirement.replace("-", ".")
import_module(requirement)
return True
except ModuleNotFoundError:
Expand Down
18 changes: 11 additions & 7 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
not os.path.isfile(MAPDL194PATH), reason="Requires MAPDL 194"
)

TWAIT = 90
TWAIT = 100
NPROC = 1


Expand Down Expand Up @@ -88,7 +88,7 @@ def pool(tmpdir_factory):
while len(mapdl_pool) != 0:
time.sleep(0.1)
if time.time() > timeout:
raise TimeoutError(f"Failed to restart instance in {TWAIT} seconds")
raise TimeoutError(f"Failed to kill instance in {TWAIT} seconds")

assert len(mapdl_pool) == 0

Expand All @@ -110,13 +110,14 @@ def test_invalid_exec():
)


@pytest.mark.xfail(strict=False, reason="Flaky test. See #2435")
# @pytest.mark.xfail(strict=False, reason="Flaky test. See #2435")
@requires("local")
def test_heal(pool):
pool_sz = len(pool)
pool_names = pool._names # copy pool names

# Killing one instance
pool[0].exit()
pool[1].exit()
pool[2].exit()

time.sleep(1) # wait for shutdown
timeout = time.time() + TWAIT
Expand All @@ -125,6 +126,7 @@ def test_heal(pool):
if time.time() > timeout:
raise TimeoutError(f"Failed to restart instance in {TWAIT} seconds")

assert pool._names == pool_names
assert len(pool) == pool_sz
pool._verify_unique_ports()

Expand All @@ -151,10 +153,12 @@ def func(mapdl, tsleep):

timeout = 2
times = np.array([0, 1, 3, 4])
output = pool.map(func, times, timeout=timeout)
output = pool.map(func, times, timeout=timeout, wait=True)

assert len(output) == (times < timeout).sum()

# wait for the pool to heal before continuing
# the timeout option kills the MAPDL instance when we reach the timeout.
# Let's wait for the pool to heal before continuing
timeout = time.time() + TWAIT
while len(pool) < pool_sz:
time.sleep(0.1)
Expand Down

0 comments on commit 04aae4f

Please sign in to comment.