Skip to content

Commit

Permalink
Merge pull request #297 from EESSI/support_job_handover_protocols
Browse files Browse the repository at this point in the history
first attempt at enabling two ways to submit/receive jobs
  • Loading branch information
casparvl authored Feb 13, 2025
2 parents 358ef01 + 156c963 commit 94ca197
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 37 deletions.
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,30 @@ package repositories. Typically these settings are set in the prologue of a
Slurm job. However, when entering the [EESSI compatibility layer](https://www.eessi.io/docs/compatibility_layer),
most environment settings are cleared. Hence, they need to be set again at a later stage.

```
job_delay_begin_factor = 2
```
The `job_delay_begin_factor` setting defines how many times the `poll_interval` a
job's begin (EligibleTime) from now should be delayed if the handover protocol
is set to `delayed_begin` (see setting `job_handover_protocol`). That is, if
the `job_delay_begin_factor` is set to five (5) the delay time is calculated as
5 * `poll_interval`. The event manager would use 2 as default value when
submitting jobs.

```
job_handover_protocol = hold_release
```
The `job_handover_protocol` setting defines which method is used to handover a
job from the event handler to the job manager. Values are
- `hold_release` (job is submitted with `--hold`, job manager removes the hold
with `scontrol release`)
- `delayed_begin` (job is submitted with `--begin=now+(5 * poll_interval)` and
any `--hold` is removed from the submission parameters); see setting
`poll_interval` further below; this is useful if the
bot account cannot run `scontrol release` to remove the hold of the job;
also, the status update in the PR comment of the job is extended by noting
the `EligibleTime`

```
job_name = JOB_NAME
```
Expand Down Expand Up @@ -665,12 +689,30 @@ scontrol_command = /usr/bin/scontrol
#### `[submitted_job_comments]` section

The `[submitted_job_comments]` section specifies templates for messages about newly submitted jobs.

DEPRECATED setting (use `awaits_release_delayed_begin_msg` and/or `awaits_release_hold_release_msg`)
```
awaits_release = job id `{job_id}` awaits release by job manager
```
`awaits_release` is used to provide a status update of a job (shown as a row in the job's status
table).

```
awaits_release_delayed_begin_msg = job id `{job_id}` will be eligible to start in about {delay_seconds} seconds
```
`awaits_release_delayed_begin_msg` is used when the `job_handover_protocol` is
set to `delayed_begin`. Note, both `{job_id}` and `{delay_seconds}` need to be
present in the value or the event handler will throw an exception when formatting
the update of the PR comment corresponding to the job.

```
awaits_release_hold_release_msg = job id `{job_id}` awaits release by job manager
```
`awaits_release_hold_release_msg` is used when the `job_handover_protocol` is
set to `hold_release`. Note, `{job_id}` needs to be present in the value or the
event handler will throw an exception when formatting the update of the PR
comment corresponding to the job.

```
initial_comment = New job on instance `{app_name}` for architecture `{arch_name}`{accelerator_spec} for repository `{repo_id}` in job dir `{symlink}`
```
Expand Down
23 changes: 22 additions & 1 deletion app.cfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,25 @@ container_cachedir = PATH_TO_SHARED_DIRECTORY
# http_proxy = http://PROXY_DNS:3128/
# https_proxy = http://PROXY_DNS:3128/

# The job_delay_begin_factor setting defines how many times the poll_interval a
# job's begin (EligibleTime) from now should be delayed if the handover protocol
# is set to `delayed_begin` (see setting `job_handover_protocol`). That is, if
# the job_delay_begin_factor is set to five (5) the delay time is calculated as
# 5 * poll_interval. The event manager would use 2 as the default factor when
# submitting jobs.
job_delay_begin_factor = 2

# The job_handover_protocol setting defines which method is used to handover a
# job from the event handler to the job manager. Values are
# - hold_release (job is submitted with '--hold', job manager removes the hold
# with 'scontrol release')
# - delayed_begin (job is submitted with '--begin=now+(5 * poll_interval)' and
# any '--hold' is removed from the submission parameters); this is useful if the
# bot account cannot run 'scontrol release' to remove the hold of the job;
# also, the status update in the PR comment of the job is extended by noting
# the 'EligibleTime'
job_handover_protocol = hold_release

# Used to give all jobs of a bot instance the same name. Can be used to allow
# multiple bot instances running on the same Slurm cluster.
job_name = prod
Expand Down Expand Up @@ -257,12 +276,14 @@ scontrol_command = /usr/bin/scontrol
# information.
[submitted_job_comments]
awaits_release = job id `{job_id}` awaits release by job manager
awaits_release_delayed_begin_msg = job id `{job_id}` will be eligible to start in about {delay_seconds} seconds
awaits_release_hold_release_msg = job id `{job_id}` awaits release by job manager
initial_comment = New job on instance `{app_name}` for CPU micro-architecture `{arch_name}`{accelerator_spec} for repository `{repo_id}` in job dir `{symlink}`
with_accelerator =  and accelerator `{accelerator}`


[new_job_comments]
awaits_launch = job awaits launch by Slurm scheduler
awaits_launch = job awaits launch by Slurm scheduler{extra_info}

[running_job_comments]
running_job = job `{job_id}` is running
Expand Down
10 changes: 9 additions & 1 deletion eessi_bot_event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
# config.BUILDENV_SETTING_CVMFS_CUSTOMIZATIONS, # optional
# config.BUILDENV_SETTING_HTTPS_PROXY, # optional
# config.BUILDENV_SETTING_HTTP_PROXY, # optional
# config.BUILDENV_SETTING_JOB_DELAY_BEGIN_FACTOR, # optional (default: 2)
config.BUILDENV_SETTING_JOB_HANDOVER_PROTOCOL, # required
config.BUILDENV_SETTING_JOB_NAME, # required
config.BUILDENV_SETTING_JOBS_BASE_DIR, # required
# config.BUILDENV_SETTING_LOAD_MODULES, # optional
Expand Down Expand Up @@ -92,12 +94,18 @@
config.GITHUB_SETTING_APP_NAME, # required
config.GITHUB_SETTING_INSTALLATION_ID, # required
config.GITHUB_SETTING_PRIVATE_KEY], # required
# the poll interval setting is required for the alternative job handover
# protocol (delayed_begin)
config.SECTION_JOB_MANAGER: [
config.JOB_MANAGER_SETTING_POLL_INTERVAL], # required
config.SECTION_REPO_TARGETS: [
config.REPO_TARGETS_SETTING_REPO_TARGET_MAP, # required
config.REPO_TARGETS_SETTING_REPOS_CFG_DIR], # required
config.SECTION_SUBMITTED_JOB_COMMENTS: [
config.SUBMITTED_JOB_COMMENTS_SETTING_INITIAL_COMMENT, # required
config.SUBMITTED_JOB_COMMENTS_SETTING_AWAITS_RELEASE, # required
# config.SUBMITTED_JOB_COMMENTS_SETTING_AWAITS_RELEASE, # optional
config.SUBMITTED_JOB_COMMENTS_SETTING_AWAITS_RELEASE_DELAYED_BEGIN_MSG, # required
config.SUBMITTED_JOB_COMMENTS_SETTING_AWAITS_RELEASE_HOLD_RELEASE_MSG, # required
config.SUBMITTED_JOB_COMMENTS_SETTING_WITH_ACCELERATOR], # required
}

Expand Down
80 changes: 59 additions & 21 deletions eessi_bot_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
# settings that are required in 'app.cfg'
REQUIRED_CONFIG = {
config.SECTION_BUILDENV: [
config.BUILDENV_SETTING_JOB_HANDOVER_PROTOCOL, # required
config.BUILDENV_SETTING_JOB_NAME], # required
config.SECTION_FINISHED_JOB_COMMENTS: [
config.FINISHED_JOB_COMMENTS_SETTING_JOB_RESULT_UNKNOWN_FMT, # required
Expand Down Expand Up @@ -91,6 +92,9 @@ def __init__(self):
self.job_name = buildenv_cfg.get(config.BUILDENV_SETTING_JOB_NAME)
if self.job_name and len(self.job_name) < 3:
raise Exception(f"job name ({self.job_name}) is shorter than 3 characters")
self.job_handover_protocol = buildenv_cfg.get(config.BUILDENV_SETTING_JOB_HANDOVER_PROTOCOL)
if self.job_handover_protocol not in config.JOB_HANDOVER_PROTOCOLS_SET:
raise Exception(f"job handover protocol ({self.job_handover_protocol}) is unknown")

def get_current_jobs(self):
"""
Expand Down Expand Up @@ -256,6 +260,25 @@ def determine_finished_jobs(self, known_jobs, current_jobs):

return finished_jobs

def parse_scontrol_show_job_output(self, output):
"""
The output of 'scontrol --oneliner show job' is a list of key=value pairs
separated by whitespaces.
Args:
output (string): the output of the scontrol command
Returns:
(dict): Returns a dictionary of the key-value pairs
"""
job_info = {}
stripped_output = output.strip()
for pair in stripped_output.split():
key, value = pair.split('=', 1)
job_info[key] = value

return job_info

def process_new_job(self, new_job):
"""
Process a new job by verifying that it is a bot job and if so
Expand Down Expand Up @@ -283,19 +306,20 @@ def process_new_job(self, new_job):
log_file=self.logfile,
)

# parse output of 'scontrol_cmd' to determine the job's working
# directory
match = re.search(r".* WorkDir=(\S+) .*",
str(scontrol_output))
if match:
# parse output of 'scontrol_cmd'
job_info = self.parse_scontrol_show_job_output(str(scontrol_output))

# check if job_info contains 'WorkDir', if not we cannot process the job
# further
if 'WorkDir' in job_info:
log(
"process_new_job(): work dir of job %s: '%s'"
% (job_id, match.group(1)),
% (job_id, job_info['WorkDir']),
self.logfile,
)

job_metadata_path = "%s/_bot_job%s.metadata" % (
match.group(1),
job_info['WorkDir'],
job_id,
)

Expand All @@ -313,21 +337,34 @@ def process_new_job(self, new_job):
symlink_source = os.path.join(self.submitted_jobs_dir, job_id)
log(
"process_new_job(): create a symlink: %s -> %s"
% (symlink_source, match.group(1)),
% (symlink_source, job_info['WorkDir']),
self.logfile,
)
os.symlink(match.group(1), symlink_source)

release_cmd = "%s release %s" % (
self.scontrol_command,
job_id,
)
os.symlink(job_info['WorkDir'], symlink_source)

# handle different job handover protocols
# *_HOLD_RELEASE: job was submitted with '--hold' and shall be
# released with 'scontrol release JOB_ID'
# *_DELAYED_BEGIN: job was submitted with '--begin=now+SOMEDELAY',
# no extra action is needed
job_status = ''
extra_info = ''
if self.job_handover_protocol == config.JOB_HANDOVER_PROTOCOL_HOLD_RELEASE:
release_cmd = "%s release %s" % (
self.scontrol_command,
job_id,
)

release_output, release_err, release_exitcode = run_cmd(
release_cmd,
"process_new_job(): scontrol command",
log_file=self.logfile,
)
release_output, release_err, release_exitcode = run_cmd(
release_cmd,
"process_new_job(): scontrol command",
log_file=self.logfile,
)
job_status = 'released'
extra_info = ''
elif self.job_handover_protocol == config.JOB_HANDOVER_PROTOCOL_DELAYED_BEGIN:
job_status = 'received'
extra_info = " (eligible to start from {job_info['EligibleTime'})"

# update PR defined by repo and pr_number stored in the job's
# metadata file
Expand Down Expand Up @@ -356,8 +393,9 @@ def process_new_job(self, new_job):
if "comment_id" in new_job:
new_job_comments_cfg = config.read_config()[config.SECTION_NEW_JOB_COMMENTS]
dt = datetime.now(timezone.utc)
update = "\n|%s|released|" % dt.strftime("%b %d %X %Z %Y")
update += f"{new_job_comments_cfg[config.NEW_JOB_COMMENTS_SETTING_AWAITS_LAUNCH]}|"
update = "\n|%s|%s|" % (dt.strftime("%b %d %X %Z %Y"), job_status)
description_col_fmt = new_job_comments_cfg[config.NEW_JOB_COMMENTS_SETTING_AWAITS_LAUNCH]
update += f"{description_col_fmt.format(extra_info=extra_info)}|"
update_comment(new_job["comment_id"], pr, update)
else:
log(
Expand Down
75 changes: 61 additions & 14 deletions tasks/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,30 @@ def get_build_env_cfg(cfg):
log(f"{fn}(): submit_command '{submit_command}'")
config_data[config.BUILDENV_SETTING_SUBMIT_COMMAND] = submit_command

job_handover_protocol = buildenv.get(config.BUILDENV_SETTING_JOB_HANDOVER_PROTOCOL)
slurm_params = buildenv.get(config.BUILDENV_SETTING_SLURM_PARAMS)
# always submit jobs with hold set, so job manager can release them
slurm_params += ' --hold'
if job_handover_protocol == config.JOB_HANDOVER_PROTOCOL_HOLD_RELEASE:
# always submit jobs with hold set, so job manager can release them
slurm_params += ' --hold'
elif job_handover_protocol == config.JOB_HANDOVER_PROTOCOL_DELAYED_BEGIN:
# alternative method to submit without '--hold' and
# '--begin=now+factor*poll_interval' instead
# 1. remove '--hold' if any
# 2. add '--begin=now+factor*poll_interval'
# factor defined by setting 'job_delay_begin_factor' (default: 2)
slurm_params = slurm_params.replace('--hold', '')
job_manger_cfg = cfg[config.SECTION_JOB_MANAGER]
poll_interval = int(job_manger_cfg.get(config.JOB_MANAGER_SETTING_POLL_INTERVAL))
job_delay_begin_factor = float(buildenv.get(config.BUILDENV_SETTING_JOB_DELAY_BEGIN_FACTOR, 2))
slurm_params += f' --begin=now+{int(job_delay_begin_factor * poll_interval)}'
else:
slurm_params += ' --hold'
log(
f"{fn}(): unknown job handover protocol in app.cfg"
f" ('{config.BUILDENV_SETTING_JOB_HANDOVER_PROTOCOL} = {job_handover_protocol}');"
f" added '--hold' as default"
)

log(f"{fn}(): slurm_params '{slurm_params}'")
config_data[config.BUILDENV_SETTING_SLURM_PARAMS] = slurm_params

Expand Down Expand Up @@ -899,18 +920,44 @@ def create_pr_comment(job, job_id, app_name, pr, gh, symlink):
dt = datetime.now(timezone.utc)

# construct initial job comment
job_comment = (f"{submitted_job_comments_cfg[config.SUBMITTED_JOB_COMMENTS_SETTING_INITIAL_COMMENT]}"
f"\n|date|job status|comment|\n"
f"|----------|----------|------------------------|\n"
f"|{dt.strftime('%b %d %X %Z %Y')}|"
f"submitted|"
f"{submitted_job_comments_cfg[config.SUBMITTED_JOB_COMMENTS_SETTING_AWAITS_RELEASE]}|").format(
app_name=app_name,
arch_name=arch_name,
symlink=symlink,
repo_id=job.repo_id,
job_id=job_id,
accelerator_spec=accelerator_spec_str)
buildenv = config.read_config()[config.SECTION_BUILDENV]
job_handover_protocol = buildenv.get(config.BUILDENV_SETTING_JOB_HANDOVER_PROTOCOL)
if job_handover_protocol == config.JOB_HANDOVER_PROTOCOL_DELAYED_BEGIN:
release_msg_string = config.SUBMITTED_JOB_COMMENTS_SETTING_AWAITS_RELEASE_DELAYED_BEGIN_MSG
release_comment_template = submitted_job_comments_cfg[release_msg_string]
# calculate delay from poll_interval and delay_factor
job_manager_cfg = config.read_config()[config.SECTION_JOB_MANAGER]
poll_interval = int(job_manager_cfg.get(config.JOB_MANAGER_SETTING_POLL_INTERVAL))
delay_factor = float(buildenv.get(config.BUILDENV_SETTING_JOB_DELAY_BEGIN_FACTOR, 2))
eligible_in_seconds = int(poll_interval * delay_factor)
job_comment = (f"{submitted_job_comments_cfg[config.SUBMITTED_JOB_COMMENTS_SETTING_INITIAL_COMMENT]}"
f"\n|date|job status|comment|\n"
f"|----------|----------|------------------------|\n"
f"|{dt.strftime('%b %d %X %Z %Y')}|"
f"submitted|"
f"{release_comment_template}|").format(
app_name=app_name,
arch_name=arch_name,
symlink=symlink,
repo_id=job.repo_id,
job_id=job_id,
delay_seconds=eligible_in_seconds,
accelerator_spec=accelerator_spec_str)
else:
release_msg_string = config.SUBMITTED_JOB_COMMENTS_SETTING_AWAITS_RELEASE_HOLD_RELEASE_MSG
release_comment_template = submitted_job_comments_cfg[release_msg_string]
job_comment = (f"{submitted_job_comments_cfg[config.SUBMITTED_JOB_COMMENTS_SETTING_INITIAL_COMMENT]}"
f"\n|date|job status|comment|\n"
f"|----------|----------|------------------------|\n"
f"|{dt.strftime('%b %d %X %Z %Y')}|"
f"submitted|"
f"{release_comment_template}|").format(
app_name=app_name,
arch_name=arch_name,
symlink=symlink,
repo_id=job.repo_id,
job_id=job_id,
accelerator_spec=accelerator_spec_str)

# create comment to pull request
repo_name = pr.base.repo.full_name
Expand Down
3 changes: 3 additions & 0 deletions tests/test_app.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
# sample config file for tests (some functions run config.read_config()
# which reads app.cfg by default)
[buildenv]
job_handover_protocol = hold_release

[job_manager]

# variable 'comment' under 'submitted_job_comments' should not be changed as there are regular expression patterns matching it
[submitted_job_comments]
awaits_release = job id `{job_id}` awaits release by job manager
awaits_release_delayed_begin_msg = job id `{job_id}` will be eligible to start in about {delay_seconds} seconds
awaits_release_hold_release_msg = job id `{job_id}` awaits release by job manager
initial_comment = New job on instance `{app_name}` for CPU micro-architecture `{arch_name}`{accelerator_spec} for repository `{repo_id}` in job dir `{symlink}`
with_accelerator = &nbsp;and accelerator `{accelerator}`

Expand Down
Loading

0 comments on commit 94ca197

Please sign in to comment.