Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding timeout to pool.__getitem__ #2176

Merged
merged 4 commits into from
Jul 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 65 additions & 16 deletions src/ansys/mapdl/core/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import shutil
import time
from typing import Any, Dict, List, Optional
import warnings

from ansys.mapdl.core import LOG, get_ansys_path, launch_mapdl
Expand All @@ -15,11 +16,11 @@
from tqdm import tqdm


def available_ports(n_ports, starting_port=MAPDL_DEFAULT_PORT):
def available_ports(n_ports: int, starting_port: int = MAPDL_DEFAULT_PORT) -> List[int]:
"""Return a list the first ``n_ports`` ports starting from ``starting_port``."""

port = MAPDL_DEFAULT_PORT
ports = []
ports: List[int] = []
while port < 65536 and len(ports) < n_ports:
if not port_in_use(port):
ports.append(port)
Expand Down Expand Up @@ -109,21 +110,23 @@ class LocalMapdlPool:

def __init__(
self,
n_instances,
wait=True,
run_location=None,
port=MAPDL_DEFAULT_PORT,
progress_bar=True,
restart_failed=True,
remove_temp_files=True,
n_instances: int,
wait: bool = True,
run_location: Optional[str] = None,
port: int = MAPDL_DEFAULT_PORT,
progress_bar: bool = True,
restart_failed: bool = True,
remove_temp_files: bool = True,
**kwargs,
):
) -> None:
"""Initialize several instances of mapdl"""
self._instances = []
self._root_dir = run_location
self._instances: List[None] = []
self._root_dir: str = run_location
kwargs["remove_temp_files"] = remove_temp_files
kwargs["mode"] = "grpc"
self._spawn_kwargs = kwargs
self._spawn_kwargs: Dict[str, Any] = kwargs
self._spawning_i: int = 0
self._exiting_i: int = 0

# verify that mapdl is 2021R1 or newer
if "exec_file" in kwargs:
Expand Down Expand Up @@ -191,7 +194,23 @@ def __init__(

self._verify_unique_ports()

def _verify_unique_ports(self):
@property
def _spawning(self) -> bool:
"""Return true if spawning new MAPDL instance"""
# Because spawning is threaded, we need to make sure we are approaching this
# with counters instead of a bool.

return self._spawning_i != 0

@property
def _exiting(self) -> bool:
"""Return true if exiting a MAPDL instance"""
# Because exiting is threaded, we need to make sure we are approaching this
# with counters instead of a bool.

return self._exiting_i != 0

def _verify_unique_ports(self) -> None:
if len(self._ports) != len(self):
raise MapdlRuntimeError("MAPDLPool has overlapping ports")

Expand Down Expand Up @@ -367,9 +386,11 @@ def run(name=""):
self._instances[i] = None

try:
self._exiting_i += 1
instance.exit()
except Exception as e:
LOG.error("Failed to close instance", exc_info=True)
self._exiting_i -= 1

else:
# wait for all threads to complete
Expand Down Expand Up @@ -532,12 +553,14 @@ def exit(self, block=False):
@threaded
def threaded_exit(index, instance):
if instance:
self._exiting_i += 1
try:
instance.exit()
except:
pass
self._instances[index] = None
# LOG.debug("Exited instance: %s", str(instance))
self._exiting_i -= 1

threads = []
for i, instance in enumerate(self):
Expand All @@ -554,8 +577,25 @@ def __len__(self):
count += 1
return count

def __getitem__(self, key):
def __getitem__(self, key: int):
"""Return an instance by an index"""

# Regarding issue 2173.
# there are two options here:
# * the MAPDL instance hasn't be created yet. It is threaded.
# * it died and it hasn't been relaunched.
# Because we are seeing some random errors, I would bet on the first
time0 = time.time()
timeout = 10 # seconds
while (
self._instances[key] is None
and time.time() < (time0 + timeout)
and self._spawning
):
time.sleep(0.1)
# We could respawn an instance here, but later at some parts of the code,
# we do check if the instance is None.

return self._instances[key]

def __iter__(self):
Expand All @@ -565,9 +605,13 @@ def __iter__(self):
yield instance

@threaded_daemon
def _spawn_mapdl(self, index, port=None, pbar=None, name=""):
def _spawn_mapdl(
self, index: int, port: int = None, pbar: Optional[bool] = None, name: str = ""
):
"""Spawn a mapdl instance at an index"""
# create a new temporary directory for each instance
self._spawning_i += 1

run_location = create_temp_dir(self._root_dir)
self._instances[index] = launch_mapdl(
run_location=run_location, port=port, **self._spawn_kwargs
Expand All @@ -576,6 +620,8 @@ def _spawn_mapdl(self, index, port=None, pbar=None, name=""):
if pbar is not None:
pbar.update(1)

self._spawning_i -= 1

@threaded_daemon
def _monitor_pool(self, refresh=1.0, name=""):
"""Checks if instances within a pool have exited (failed) and
Expand All @@ -588,12 +634,15 @@ def _monitor_pool(self, refresh=1.0, name=""):
if instance._exited:
try:
# use the next port after the current available port
self._spawning_i += 1
port = max(self._ports) + 1
self._spawn_mapdl(
index, port=port, name=f"Instance {index}"
).join()
except Exception as e:
LOG.error(e, exc_info=True)
self._spawning_i -= 1

time.sleep(refresh)

@property
Expand Down