Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Executor and HTCondor Bulk Operations #224

Merged
merged 58 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
3491a86
draft for bulk execution
maxfischer2781 Dec 22, 2021
0db8645
documented intended usage
maxfischer2781 Dec 22, 2021
973bdbb
typo fix
maxfischer2781 Dec 22, 2021
6e47c3b
typed Executor signature
maxfischer2781 Jan 3, 2022
9f8e5cc
handle empty bulk, enforce order
maxfischer2781 Jan 3, 2022
78404b0
bulk rm
maxfischer2781 Jan 3, 2022
8f3a07b
quoted types
maxfischer2781 Jan 3, 2022
85cdab2
Protocol from typing_extensions
maxfischer2781 Jan 3, 2022
8fb9287
lazy initialising semaphore
maxfischer2781 Jan 3, 2022
414b60d
quoted types
maxfischer2781 Jan 3, 2022
676cfa0
Prior to Python 3.7, the low-level asyncio.ensure_future() function c…
maxfischer2781 Jan 3, 2022
2fd2923
reading bulk settings from config
maxfischer2781 Jan 4, 2022
e7468b5
bulk execution checks for matching output
maxfischer2781 Jan 4, 2022
9157397
error formatting
maxfischer2781 Jan 4, 2022
76b5ea5
using bulk submit/suspend/remove
maxfischer2781 Jan 4, 2022
d95e0ed
pre-3.8 call_args.kwargs
maxfischer2781 Jan 4, 2022
1c4014e
BulkExecution is directly callable
maxfischer2781 Jan 5, 2022
071b1db
added some crisp typing lingo
maxfischer2781 Jan 5, 2022
a0b5dd8
bulk commands take variadic args instead of tuple
maxfischer2781 Jan 5, 2022
ecc71e4
BulkCommand spec
maxfischer2781 Jan 5, 2022
d4f4713
BulkExecution argument is anonymous
maxfischer2781 Jan 5, 2022
d33a880
code reduction
maxfischer2781 Jan 5, 2022
c21507f
flattened package layout
maxfischer2781 Jan 5, 2022
f6185e7
test formatting
maxfischer2781 Jan 5, 2022
03d3b2e
oops
maxfischer2781 Jan 5, 2022
65b9893
black
maxfischer2781 Jan 5, 2022
f3e18a2
bulk tests
maxfischer2781 Jan 5, 2022
7993bba
bulk execution cannot stall with small delays
maxfischer2781 Jan 5, 2022
2c55fcb
verifying Bulk settings
maxfischer2781 Jan 5, 2022
e7449a6
explicitly checking exception types
maxfischer2781 Jan 5, 2022
64a0be5
renamed BulkExecution to AsyncBulkCall
maxfischer2781 Jan 10, 2022
724e751
pulled queue reading into AsyncBulkCall class
maxfischer2781 Jan 10, 2022
d11872d
documented bulk settings
maxfischer2781 Jan 10, 2022
5dbe599
using cached_property for lazy attributes
maxfischer2781 Jan 10, 2022
4c274b6
"This is the preferred way to create Futures in asyncio."
maxfischer2781 Jan 10, 2022
5bd66e1
test name matches implementation name
maxfischer2781 Jan 10, 2022
340e541
quoted type hints
maxfischer2781 Jan 10, 2022
86f105f
adjusted code for Python 3.6
maxfischer2781 Jan 10, 2022
b2dbc4a
testing elapsed duration instead of call count
maxfischer2781 Jan 10, 2022
31de4b7
incrased test timing to accomodate JIT
maxfischer2781 Jan 10, 2022
ef537af
black
maxfischer2781 Jan 10, 2022
4561e52
using variable grace for timing
maxfischer2781 Jan 10, 2022
b10e04e
further timing tweaks
maxfischer2781 Jan 10, 2022
d909bd1
updated all parts of the test...
maxfischer2781 Jan 10, 2022
be36258
documented _condor_tool
maxfischer2781 Jan 12, 2022
f7c286f
adjusted docstring
maxfischer2781 Jan 12, 2022
74ffd75
concurrent claims are handled by the task, not bulk execution
maxfischer2781 Jan 12, 2022
ce859f7
off by one
maxfischer2781 Jan 12, 2022
ba5ff71
normalize UUIDs during condor_q lookups
maxfischer2781 Jan 31, 2022
612fb7d
automatically inserting queue statements
maxfischer2781 Jan 31, 2022
42f2aab
concurrency only accepts int or None
maxfischer2781 Jan 31, 2022
4eeea62
debug log on rm/suspend failure
maxfischer2781 Feb 1, 2022
d07f59c
using suspend-until-first instead of spin lock waiting
maxfischer2781 Feb 1, 2022
0079046
Merge branch 'master' into feature/bulk_htcondor
maxfischer2781 Feb 1, 2022
94d0613
removed unused import
maxfischer2781 Feb 2, 2022
cea7d68
updated black
maxfischer2781 Feb 3, 2022
f673f2d
added example for AsyncBulkCall + partial
maxfischer2781 Feb 3, 2022
f8b9c70
adjusted wording of delay parameter
maxfischer2781 Feb 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/source/adapters/site.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ Available adapter configuration options
| Option | Short Description | Requirement |
+================+===================================================================================+=================+
| max_age | The result of the `condor_status` call is cached for `max_age` in minutes. | **Required** |
+================+===================================================================================+=================+
| bulk_size | Maximum number of jobs to handle per bulk invocation of a condor tool. | **Optional** |
+ + + +
| | Default: 100 | |
+================+===================================================================================+=================+
| bulk_delay | Maximum duration in seconds to wait per bulk invocation of a condor tool. | **Optional** |
+ + + +
| | Default: 1.0 | |
+----------------+-----------------------------------------------------------------------------------+-----------------+
| executor | The |executor| used to run submission and further calls to the Moab batch system. | **Optional** |
+ + + +
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def get_cryptography_version():
"kubernetes_asyncio",
"pydantic",
"asyncstdlib",
"typing_extensions",
"backports.cached_property",
giffels marked this conversation as resolved.
Show resolved Hide resolved
],
extras_require={
"docs": [
Expand Down
157 changes: 124 additions & 33 deletions tardis/adapters/sites/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import Iterable, Tuple, Awaitable
from ...exceptions.executorexceptions import CommandExecutionFailure
from ...exceptions.tardisexceptions import TardisError
from ...exceptions.tardisexceptions import TardisResourceStatusUpdateFailed
from ...interfaces.siteadapter import SiteAdapter
from ...interfaces.siteadapter import ResourceStatus
from ...interfaces.executor import Executor
from ...utilities.asynccachemap import AsyncCacheMap
from ...utilities.attributedict import AttributeDict
from ...utilities.staticmapping import StaticMapping
from ...utilities.executors.shellexecutor import ShellExecutor
from ...utilities.asyncbulkcall import AsyncBulkCall
from ...utilities.utils import csv_parser, machine_meta_data_translation

from contextlib import contextmanager
Expand All @@ -20,6 +23,14 @@
logger = logging.getLogger("cobald.runtime.tardis.adapters.sites.htcondor")


# TODO: Remove this once the old-style UUIDs are deprecated
giffels marked this conversation as resolved.
Show resolved Hide resolved
def _job_id(resource_uuid: str) -> str:
"""
Normalize single "ClusterID" and bulk "ClusterID.ProcID" UUIDs to job IDs
"""
return resource_uuid if "." in resource_uuid else f"{resource_uuid}.0"


async def htcondor_queue_updater(executor):
attributes = dict(
Owner="Owner", JobStatus="JobStatus", ClusterId="ClusterId", ProcId="ProcId"
Expand All @@ -40,10 +51,94 @@ async def htcondor_queue_updater(executor):
delimiter="\t",
replacements=dict(undefined=None),
):
htcondor_queue[row["ClusterId"]] = row
row["JobId"] = f"{row['ClusterId']}.{row['ProcId']}"
htcondor_queue[row["JobId"]] = row
return htcondor_queue


JDL = str
giffels marked this conversation as resolved.
Show resolved Hide resolved
# search the Job ID in a submit Proc line
SUBMIT_ID_PATTERN = re.compile(r"Proc\s(\d+\.\d+)")


async def condor_submit(*resource_jdls: JDL, executor: Executor) -> Iterable[str]:
"""Submit a number of resources from their JDL, reporting the new Job ID for each"""
# verbose submit gives an ordered listing of class ads, such as
# ** Proc 15556.0:
# Args = "150"
# ClusterId = 15556
# ...
# ProcId = 0
# QDate = 1641289701
# ...
#
# ** Proc 15556.1:
# ...
command = f"condor_submit -verbose -maxjobs {len(resource_jdls)}"
response = await executor.run_command(command, stdin_input="\n".join(resource_jdls))
return (
SUBMIT_ID_PATTERN.search(line).group(1)
for line in response.stdout.splitlines()
if line.startswith("** Proc")
)


# condor_rm and condor_suspend are actually the same tool under the hood
# they only differ in the method called on the Schedd and their success message
def condor_rm(
*resource_attributes: AttributeDict, executor: Executor
) -> Awaitable[Iterable[bool]]:
"""Remove a number of resources, indicating success for each"""
return condor_tool(resource_attributes, executor, "condor_rm", "marked for removal")


def condor_suspend(
*resource_attributes: AttributeDict, executor: Executor
) -> Awaitable[Iterable[bool]]:
"""Remove a number of resources, indicating success for each"""
return condor_tool(resource_attributes, executor, "condor_suspend", "suspended")


# search the Job ID in a remove/suspend mark line
TOOL_ID_PATTERN = re.compile(r"Job\s(\d+\.\d+)")


async def condor_tool(
resource_attributes: Tuple[AttributeDict, ...],
executor: Executor,
command: str,
success_message: str,
) -> Iterable[bool]:
maxfischer2781 marked this conversation as resolved.
Show resolved Hide resolved
command = (
command
+ " "
+ " ".join(
_job_id(resource.remote_resource_uuid) for resource in resource_attributes
)
)
try:
response = await executor.run_command(command)
except CommandExecutionFailure as cef:
# the tool fails if none of the jobs are found – because they all just shut down
# report graceful failure for all
if cef.exit_code == 1 and "not found" in cef.stderr:
return [False] * len(resource_attributes)
raise
# successes are in stdout, failures in stderr, both in argument order
# stdout: Job 15540.0 marked for removal
# stderr: Job 15612.0 not found
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does a not found needs to be treated here as well? At least I would like to have logger.debug output here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can add a debug log. Would you be okay if it's next to the raise TardisResourceStatusUpdateFailed in the adapter methods? It seems we don't get any sensible debug output from condor anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be okay!

# stderr: Job 15535.0 marked for removal
success_jobs = {
TOOL_ID_PATTERN.search(line).group(1)
for line in response.stdout.splitlines()
if line.endswith(success_message)
}
return (
_job_id(resource.remote_resource_uuid) in success_jobs
for resource in resource_attributes
)


# According to https://htcondor.readthedocs.io/en/latest/classad-attributes/
# job-classad-attributes.html
htcondor_status_codes = {
Expand All @@ -62,13 +157,27 @@ class HTCondorAdapter(SiteAdapter):
Cores=1, Memory=1024, Disk=1024 * 1024
)

def __init__(self, machine_type: str, site_name: str):
def __init__(
self,
machine_type: str,
site_name: str,
):
self._machine_type = machine_type
self._site_name = site_name
self._executor = getattr(self.configuration, "executor", ShellExecutor())
bulk_size = getattr(self.configuration, "bulk_size", 100)
bulk_delay = getattr(self.configuration, "bulk_delay", 1.0)
self._condor_submit, self._condor_suspend, self._condor_rm = (
AsyncBulkCall(
partial(tool, executor=self._executor),
size=bulk_size,
delay=bulk_delay,
)
for tool in (condor_submit, condor_suspend, condor_rm)
)

key_translator = StaticMapping(
remote_resource_uuid="ClusterId",
remote_resource_uuid="JobId",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this potentially have any impact?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question – not as far as I can tell, but @giffels better take a look at this as well. ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already attracted my attention. ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to have a negative effect on already present drones in the system started with an older version of tardis.

tardis reports the following:

cobald.runtime.tardis.resources.dronestates: 2022-01-28 17:14:38 Resource attributes: {'site_name': 'TOPAS-GPU', 'machine_type': 'singlegpu', 'obs_machine_meta_data_translation_mapping': {'Cores': 1, 'Memory': 1024, 'Disk': 1048576}, 'r
emote_resource_uuid': '8598911', 'created': datetime.datetime(2022, 1, 28, 17, 8, 6, 88314), 'updated': datetime.datetime(2022, 1, 28, 17, 8, 6, 88626), 'drone_uuid': 'topas-gpu-2463f9fb3f', 'resource_status': <ResourceStatus.Deleted: 4
>}

while at the same time the job is still running in HTCondor

-bash-4.2$ condor_q 8598911

-- Schedd: f03-001-140-e.gridka.de : <192.108.47.243:9618?... @ 01/28/22 17:16:44
OWNER  BATCH_NAME     SUBMITTED   DONE   RUN    IDLE  TOTAL JOB_IDS
tardis ID: 8598911   1/28 17:08      _      1      _      1 8598911.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is in

resource_uuid = resource_attributes.remote_resource_uuid
resource_status = self._htcondor_queue[resource_uuid]
. If you apply _job_id on remote_resource_id it should be fine.

Copy link
Member Author

@maxfischer2781 maxfischer2781 Feb 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up. Changed as suggested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

resource_status="JobStatus",
created="created",
updated="updated",
Expand Down Expand Up @@ -116,13 +225,8 @@ async def deploy_resource(
),
)

response = await self._executor.run_command(
"condor_submit", stdin_input=submit_jdl
)
pattern = re.compile(
r"^.*?(?P<Jobs>\d+).*?(?P<ClusterId>\d+).$", flags=re.MULTILINE
)
response = AttributeDict(pattern.search(response.stdout).groupdict())
job_id = await self._condor_submit(submit_jdl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially it could be possible to get several job_ids after executing this. Can this be handled already or is this on purpose for one job only at the moment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot actually get more than one job_id here, since the bulk execution framework forbids it. But indeed the JDL template could queue more than one job or even none.

@giffels would you object to deprecating the use of queue statements in the JDL template? It is simple enough to inject a queue 1 automatically – I don't really see any advantage in adding queue manually but many ways it can go wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just afraid that a removal will break backwards compatibilty. Officially, we should deprecate it first and remove it later on in 0.8.0. So for example checking it is in there and print a warning like we did in

warnings.warn(
"StartupCommand has been moved to the machine_type_configuration!",
DeprecationWarning,
)
, otherwise add queue 1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a helper that checks for queue statements and inserts one otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks!

response = AttributeDict(JobId=job_id)
response.update(self.create_timestamps())
return self.handle_response(response)

Expand All @@ -146,36 +250,23 @@ async def resource_status(
else:
return self.handle_response(resource_status)

async def _apply_condor_command(
self, resource_attributes: AttributeDict, condor_command: str
):
command = f"{condor_command} {resource_attributes.remote_resource_uuid}"
try:
response = await self._executor.run_command(command)
except CommandExecutionFailure as cef:
if cef.exit_code == 1 and "Couldn't find" in cef.stderr:
# Happens if condor_suspend/condor_rm is called in the moment
# the drone is shutting down itself. Repeat the procedure until
# resource has vanished from condor_q call
raise TardisResourceStatusUpdateFailed from cef
raise
pattern = re.compile(r"^.*?(?P<ClusterId>\d+).*$", flags=re.MULTILINE)
response = AttributeDict(pattern.search(response.stdout).groupdict())
return self.handle_response(response)

async def stop_resource(self, resource_attributes: AttributeDict):
"""
Stopping machines is equivalent to suspending jobs in HTCondor,
therefore condor_suspend is called!
"""
return await self._apply_condor_command(
resource_attributes, condor_command="condor_suspend"
)
if await self._condor_suspend(resource_attributes):
return self.handle_response(
AttributeDict(JobId=resource_attributes.remote_resource_uuid)
)
raise TardisResourceStatusUpdateFailed

async def terminate_resource(self, resource_attributes: AttributeDict):
return await self._apply_condor_command(
resource_attributes, condor_command="condor_rm"
)
if await self._condor_rm(resource_attributes):
return self.handle_response(
AttributeDict(JobId=resource_attributes.remote_resource_uuid)
)
raise TardisResourceStatusUpdateFailed

@staticmethod
def create_timestamps():
Expand Down
15 changes: 14 additions & 1 deletion tardis/interfaces/executor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
from typing import Optional
from typing_extensions import Protocol
from abc import ABCMeta, abstractmethod


class CommandResult(Protocol):
stdout: str
stderr: str
exitcode: int


class Executor(metaclass=ABCMeta):
@abstractmethod
async def run_command(self, command):
async def run_command(
self, command: str, stdin_input: Optional[str] = None
) -> CommandResult:
"""
Run ``command`` in a shell and provide the result
"""
return NotImplemented
2 changes: 1 addition & 1 deletion tardis/interfaces/siteadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def handle_response(
of the original response of the provider in keys of the common format.
:type key_translator: dict
:param translator_functions: A dictionary containing functions to
transform value of the original reponse of the provider into values of
transform value of the original response of the provider into values of
the common format.
:type translator_functions: dict
:param additional_content: Additional content to be put into response,
Expand Down
Loading