Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux update #359

Merged
merged 9 commits into from
May 25, 2021
41 changes: 41 additions & 0 deletions maestrowf/abstracts/interfaces/flux.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,50 @@
from abc import ABC, abstractclassmethod, abstractmethod, \
abstractstaticmethod
import logging

LOGGER = logging.getLogger(__name__)

try:
import flux
except ImportError:
LOGGER.info("Failed to import Flux. Continuing.")


class FluxInterface(ABC):

@classmethod
def connect_to_flux(cls):
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux handle created.")
broker_version = cls.flux_handle.attr_get("version")
adaptor_version = cls.key
LOGGER.debug(
"Connected to Flux broker running version %s using Maestro "
"adapter version %s.", broker_version, adaptor_version)
try:
from distutils.version import StrictVersion
adaptor_version = StrictVersion(adaptor_version)
broker_version = StrictVersion(broker_version)
if adaptor_version > broker_version:
LOGGER.error(
"Maestro adapter version (%s) is too new for the Flux "
"broker version (%s). Functionality not present in "
"this Flux version may be required by the adapter and "
"cause errors. Please switch to an older adapter.",
adaptor_version, broker_version
)
elif adaptor_version < broker_version:
LOGGER.debug(
"Maestro adaptor version (%s) is older than the Flux "
"broker version (%s). This is usually OK, but if a "
"newer Maestro adapter is available, please consider "
"upgrading to maximize performance and compatibility.",
adaptor_version, broker_version
)
except ImportError:
pass

@abstractclassmethod
def get_statuses(cls, joblist):
"""
Expand Down
12 changes: 3 additions & 9 deletions maestrowf/interfaces/script/_flux/flux0_17_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=False
):
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

# NOTE: This previously placed everything under a broker. However,
# if there's a job that schedules items to Flux, it will schedule all
Expand Down Expand Up @@ -160,9 +158,7 @@ def status_callback(future, args):
def get_statuses(cls, joblist):
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down Expand Up @@ -245,9 +241,7 @@ def cancel(cls, joblist):
"""
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down
13 changes: 3 additions & 10 deletions maestrowf/interfaces/script/_flux/flux0_18_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
try:
from flux import constants as flux_constants
from flux import job as flux_job
from flux import Flux
except ImportError:
LOGGER.info("Failed to import Flux. Continuing.")

Expand Down Expand Up @@ -64,9 +63,7 @@ def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=False
):
if not cls.flux_handle:
cls.flux_handle = Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

# NOTE: This previously placed everything under a broker. However,
# if there's a job that schedules items to Flux, it will schedule all
Expand Down Expand Up @@ -174,9 +171,7 @@ def status_callback(future, args):
def get_statuses(cls, joblist):
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down Expand Up @@ -257,9 +252,7 @@ def cancel(cls, joblist):
"""
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down
24 changes: 10 additions & 14 deletions maestrowf/interfaces/script/_flux/flux0_26_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ class FluxInterface_0260(FluxInterface):
@classmethod
def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=False
ngpus=0, job_name=None, force_broker=True
):
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

# NOTE: This previously placed everything under a broker. However,
# if there's a job that schedules items to Flux, it will schedule all
Expand All @@ -37,6 +35,9 @@ def submit(
# single node, don't use a broker -- but introduce a flag that can
# force a single node to run in a broker.

if isinstance(ngpus, str) and ngpus.isdigit():
FrankD412 marked this conversation as resolved.
Show resolved Hide resolved
ngpus = int(ngpus)

if force_broker or nodes > 1:
LOGGER.debug(
"Launch under Flux sub-broker. [force_broker=%s, nodes=%d]",
Expand All @@ -45,7 +46,7 @@ def submit(
ngpus_per_slot = int(ceil(ngpus / nodes))
jobspec = flux.job.JobspecV1.from_nest_command(
[path], num_nodes=nodes, cores_per_slot=cores_per_task,
num_slots=nodes, gpus_per_slot=ngpus_per_slot)
num_slots=procs, gpus_per_slot=ngpus_per_slot)
else:
LOGGER.debug(
"Launch under root Flux broker. [force_broker=%s, nodes=%d]",
Expand Down Expand Up @@ -102,9 +103,8 @@ def parallelize(cls, procs, nodes=None, **kwargs):

if "gpus" in kwargs:
ngpus = str(kwargs["gpus"])
if ngpus.isdecimal():
args.append("-g")
args.append(ngpus)
args.append("-g")
args.append(ngpus)

# flux has additional arguments that can be passed via the '-o' flag.
addtl = []
Expand All @@ -122,9 +122,7 @@ def parallelize(cls, procs, nodes=None, **kwargs):
def get_statuses(cls, joblist):
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down Expand Up @@ -157,9 +155,7 @@ def cancel(cls, joblist):
"""
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down
13 changes: 9 additions & 4 deletions maestrowf/interfaces/script/fluxscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def submit(self, step, path, cwd, job_map=None, env=None):
# walltime = self._convert_walltime_to_seconds(step.run["walltime"])
nodes = step.run.get("nodes")
processors = step.run.get("procs", 0)
force_broker = step.run.get("use_broker", False)
force_broker = step.run.get("use_broker", True)
walltime = step.run.get("walltime", "inf")

# Compute cores per task
Expand All @@ -183,9 +183,14 @@ def submit(self, step, path, cwd, job_map=None, env=None):
"'cores per task' set to a non-value. Populating with a "
"sensible default. (cores per task = %d", cores_per_task)

# Calculate ngpus
ngpus = step.run.get("gpus", 0)
ngpus = 0 if not ngpus else ngpus
try:
# Calculate ngpus
ngpus = step.run.get("gpus", "0")
ngpus = int(ngpus) if ngpus else 0
FrankD412 marked this conversation as resolved.
Show resolved Hide resolved
except ValueError as val_error:
msg = f"Specified gpus '{ngpus}' is not a decimal value."
LOGGER.error(msg)
raise val_error

# Calculate nprocs
ncores = cores_per_task * nodes
Expand Down