Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make forwardmodelrunner async #9198

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open

Conversation

jonathan-eq
Copy link
Contributor

@jonathan-eq jonathan-eq commented Nov 12, 2024

Issue
Resolves #9041

Approach
We will get a lot of those errors from the forward model runner (compute cluster) when we forcefully signal kill, and dont let it shut down gracefully (a lot of those errors are due to websocket connection being suddenly dropped, and not closed). Making it async would allow us to early return from the runner's run_method by calling Task.cancel on it, and we could do some cleanup on asyncio.CancellationError.

(Screenshot of new behavior in GUI if applicable)

  • PR title captures the intent of the changes, and is fitting for release notes.
  • Added appropriate release note label
  • Commit history is consistent and clean, in line with the contribution guidelines.
  • Make sure unit tests pass locally after every commit (git rebase -i main --exec 'pytest tests/ert/unit_tests -n logical -m "not integration_test"')

When applicable

  • When there are user facing changes: Updated documentation
  • New behavior or changes to existing untested code: Ensured that unit tests are added (See Ground Rules).
  • Large PR: Prepare changes in small commits for more convenient review
  • Bug fix: Add regression test for the bug
  • Bug fix: Create Backport PR to latest release

@jonathan-eq jonathan-eq changed the title Add just command helper tool to repository Make forwardmodelrunner async Nov 13, 2024
@jonathan-eq jonathan-eq force-pushed the main4 branch 4 times, most recently from 098b7d4 to 784237d Compare November 19, 2024 08:32
@jonathan-eq jonathan-eq force-pushed the main4 branch 2 times, most recently from 3be3a96 to 4de22d3 Compare November 21, 2024 12:42
@jonathan-eq jonathan-eq marked this pull request as ready for review November 25, 2024 08:05
@jonathan-eq jonathan-eq self-assigned this Nov 25, 2024
@jonathan-eq jonathan-eq added release-notes:improvement Automatically categorise as improvement in release notes release-notes:bug-fix Automatically categorise as bug fix in release notes labels Nov 25, 2024
@@ -71,26 +79,26 @@ def _setup_logging(directory: str = "logs"):
JOBS_JSON_RETRY_TIME = 30


def _wait_for_retry():
time.sleep(JOBS_JSON_RETRY_TIME)
async def _wait_for_retry():
Copy link
Contributor

@xjules xjules Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we need this helper function at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need it for one of the tests. test_job_dispatch.py::test_retry_of_jobs_json_file_read

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this usage of that function is a bit strange though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We mock it to lock.acquire in a test, so that it will stop here

message_queue: asyncio.Queue[Message],
done: asyncio.Event,
):
while not done.is_set() or not message_queue.empty():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not message_queue.empty() looks dangerous. I think that this get handled in here:
job_status = await asyncio.wait_for(message_queue.get(), timeout=2) or ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted it to process all the events in the queue before exiting, but I can rewrite it to be clearer ✏️

nonlocal reporters, forward_model_runner_task
forward_model_runner_task.cancel()
for reporter in reporters:
reporter.cancel()
Copy link
Contributor

@xjules xjules Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try:
  await reporter
except asyncio.CancelledError:
  pass

or maybe just asyncio.gather(*reporters, return ....)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signal handler has to be synced, but we await the task anyways so it should be fine.

Copy link
Contributor

@xjules xjules Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To shutdown gracefully, this is what chatgpt suggests:

def setup_signal_handlers(loop):
    """
    Setup signal handlers for graceful shutdown.
    """
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop, signal=sig)))

wherein shutdown is an async function.

await self._dump_event(fm_checksum)

def cancel(self) -> None:
self._event_publishing_task.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this suppose to be a "blocking" operation then we should await to task to finish afterwards; ie.

self._event_publishing_task.cancel()
try:
   await self._event_publishing_task
except asyncio.CancelledError:
 ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also has to be synced as it is used in the eventloop's signal_handler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I need to look deeper in the loop signal_handler 👍

url=self._evaluator_url,
token=self._token,
cert=self._cert,
) as client:
event = None
while True:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was combined with the timeout_timestamp

@jonathan-eq
Copy link
Contributor Author

I want to keep this PR around for a future attempt at making fmrunner async.

@jonathan-eq jonathan-eq removed release-notes:bug-fix Automatically categorise as bug fix in release notes christmas-review labels Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-notes:improvement Automatically categorise as improvement in release notes
Projects
Status: Backlog
Development

Successfully merging this pull request may close these issues.

A lot of errors are displayed when choosing to terminate experiment
3 participants