-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make forwardmodelrunner async #9198
base: main
Are you sure you want to change the base?
Changes from all commits
df42399
e299ba6
f609f5e
84e389a
2b8c74a
d2af117
b4fb94a
e1c15df
129642d
5d3cac8
fbd1c48
53f999a
aa5f66e
a2a0b86
d01b408
22e625a
c013b93
b797a75
6b66226
408f9d9
157d424
443584c
1c46165
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,39 @@ | ||
import argparse | ||
import asyncio | ||
import json | ||
import logging | ||
import os | ||
import signal | ||
import sys | ||
import time | ||
import typing | ||
from datetime import datetime | ||
from typing import List, Sequence | ||
|
||
from _ert.forward_model_runner import reporting | ||
from _ert.forward_model_runner.reporting.message import Finish, ProcessTreeStatus | ||
from _ert.forward_model_runner.reporting.message import ( | ||
Finish, | ||
Message, | ||
ProcessTreeStatus, | ||
) | ||
from _ert.forward_model_runner.runner import ForwardModelRunner | ||
|
||
JOBS_FILE = "jobs.json" | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ForwardModelRunnerException(Exception): | ||
pass | ||
|
||
|
||
def _setup_reporters( | ||
is_interactive_run, | ||
ens_id, | ||
dispatch_url, | ||
ee_token=None, | ||
ee_cert_path=None, | ||
experiment_id=None, | ||
) -> typing.List[reporting.Reporter]: | ||
reporters: typing.List[reporting.Reporter] = [] | ||
) -> List[reporting.Reporter]: | ||
reporters: List[reporting.Reporter] = [] | ||
if is_interactive_run: | ||
reporters.append(reporting.Interactive()) | ||
elif ens_id and experiment_id is None: | ||
|
@@ -71,26 +79,26 @@ def _setup_logging(directory: str = "logs"): | |
JOBS_JSON_RETRY_TIME = 30 | ||
|
||
|
||
def _wait_for_retry(): | ||
time.sleep(JOBS_JSON_RETRY_TIME) | ||
async def _wait_for_retry(): | ||
await asyncio.sleep(JOBS_JSON_RETRY_TIME) | ||
|
||
|
||
def _read_jobs_file(retry=True): | ||
async def _read_jobs_file(retry=True): | ||
try: | ||
with open(JOBS_FILE, "r", encoding="utf-8") as json_file: | ||
with open(JOBS_FILE, "r", encoding="utf-8") as json_file: # noqa: ASYNC230 | ||
return json.load(json_file) | ||
except json.JSONDecodeError as e: | ||
raise IOError("Job Runner cli failed to load JSON-file.") from e | ||
except FileNotFoundError as e: | ||
if retry: | ||
logger.error(f"Could not find file {JOBS_FILE}, retrying") | ||
_wait_for_retry() | ||
return _read_jobs_file(retry=False) | ||
await _wait_for_retry() | ||
return await _read_jobs_file(retry=False) | ||
else: | ||
raise e | ||
|
||
|
||
def main(args): | ||
async def main(args): | ||
parser = argparse.ArgumentParser( | ||
description=( | ||
"Run all the jobs specified in jobs.json, " | ||
|
@@ -118,7 +126,7 @@ def main(args): | |
# Make sure that logging is setup _after_ we have moved to the runpath directory | ||
_setup_logging() | ||
|
||
jobs_data = _read_jobs_file() | ||
jobs_data = await _read_jobs_file() | ||
|
||
experiment_id = jobs_data.get("experiment_id") | ||
ens_id = jobs_data.get("ens_id") | ||
|
@@ -135,21 +143,64 @@ def main(args): | |
ee_cert_path, | ||
experiment_id, | ||
) | ||
reporter_queue: asyncio.Queue[Message] = asyncio.Queue() | ||
|
||
is_done = asyncio.Event() | ||
forward_model_runner = ForwardModelRunner(jobs_data, reporter_queue=reporter_queue) | ||
forward_model_runner_task = asyncio.create_task( | ||
forward_model_runner.run(parsed_args.job) | ||
) | ||
reporting_task = asyncio.create_task( | ||
handle_reporting(reporters, reporter_queue, is_done) | ||
) | ||
|
||
def handle_sigterm(*args, **kwargs): | ||
nonlocal reporters, forward_model_runner_task | ||
forward_model_runner_task.cancel() | ||
for reporter in reporters: | ||
reporter.cancel() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. try:
await reporter
except asyncio.CancelledError:
pass or maybe just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The signal handler has to be synced, but we await the task anyways so it should be fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To shutdown gracefully, this is what chatgpt suggests: def setup_signal_handlers(loop):
"""
Setup signal handlers for graceful shutdown.
"""
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop, signal=sig))) wherein shutdown is an async function. |
||
|
||
job_runner = ForwardModelRunner(jobs_data) | ||
asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, handle_sigterm) | ||
|
||
await forward_model_runner_task | ||
|
||
is_done.set() | ||
await reporting_task | ||
|
||
|
||
async def handle_reporting( | ||
reporters: Sequence[reporting.Reporter], | ||
message_queue: asyncio.Queue[Message], | ||
is_done: asyncio.Event, | ||
): | ||
while True: | ||
try: | ||
job_status = await asyncio.wait_for(message_queue.get(), timeout=2) | ||
except asyncio.TimeoutError: | ||
if is_done.is_set(): | ||
break | ||
continue | ||
|
||
for job_status in job_runner.run(parsed_args.job): | ||
logger.info(f"Job status: {job_status}") | ||
for reporter in reporters: | ||
try: | ||
reporter.report(job_status) | ||
await reporter.report(job_status) | ||
except OSError as oserror: | ||
print( | ||
f"job_dispatch failed due to {oserror}. Stopping and cleaning up." | ||
) | ||
pgid = os.getpgid(os.getpid()) | ||
os.killpg(pgid, signal.SIGKILL) | ||
await let_reporters_finish(reporters) | ||
raise ForwardModelRunnerException from oserror | ||
|
||
message_queue.task_done() | ||
if isinstance(job_status, Finish) and not job_status.success(): | ||
pgid = os.getpgid(os.getpid()) | ||
os.killpg(pgid, signal.SIGKILL) | ||
await let_reporters_finish(reporters) | ||
raise ForwardModelRunnerException(job_status.error_message) | ||
|
||
await let_reporters_finish(reporters) | ||
|
||
|
||
async def let_reporters_finish(reporters): | ||
for reporter in reporters: | ||
if isinstance(reporter, reporting.Event): | ||
await reporter.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we need this helper function at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need it for one of the tests.
test_job_dispatch.py::test_retry_of_jobs_json_file_read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this usage of that function is a bit strange though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We mock it to
lock.acquire
in a test, so that it will stop here