Skip to content

Commit

Permalink
fix even more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Nov 26, 2024
1 parent f609f5e commit 84e389a
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ async def evaluate(self, config, _, __):
)
async with Client(config.url + "/dispatch") as dispatch:
event = EnsembleStarted(ensemble=self.id_)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))

event_id += 1
for real in range(0, self.test_reals):
real = str(real)

event = RealizationUnknown(ensemble=self.id_, real=real)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))

event_id += 1
for fm_step in range(0, self.fm_steps):
Expand All @@ -95,7 +95,7 @@ async def evaluate(self, config, _, __):
fm_step=fm_step,
current_memory_usage=1000,
)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))
event_id += 1

event = ForwardModelStepSuccess(
Expand All @@ -104,16 +104,16 @@ async def evaluate(self, config, _, __):
fm_step=fm_step,
current_memory_usage=1000,
)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))
event_id += 1
event_id += 1

event = RealizationSuccess(ensemble=self.id_, real=real)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))
event_id += 1

event = EnsembleSucceeded(ensemble=self.id_)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))

@property
def cancellable(self) -> bool:
Expand Down
32 changes: 16 additions & 16 deletions tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ async def test_restarted_jobs_do_not_have_error_msgs(evaluator_to_use):
fm_step="0",
current_memory_usage=1000,
)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))

event = ForwardModelStepFailure(
ensemble=evaluator.ensemble.id_,
real="0",
fm_step="0",
error_msg="error",
)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))

def is_completed_snapshot(snapshot: EnsembleSnapshot) -> bool:
try:
Expand Down Expand Up @@ -212,7 +212,7 @@ def is_completed_snapshot(snapshot: EnsembleSnapshot) -> bool:
fm_step="0",
current_memory_usage=1000,
)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))

# reconnect new monitor
async with Monitor(config_info) as new_monitor:
Expand Down Expand Up @@ -270,23 +270,23 @@ async def test_new_monitor_can_pick_up_where_we_left_off(evaluator_to_use):
fm_step="0",
current_memory_usage=1000,
)
await dispatch1._send(event_to_json(event))
await dispatch1.send(event_to_json(event))
# second dispatch endpoint client informs that forward model 0 is running
event = ForwardModelStepRunning(
ensemble=evaluator.ensemble.id_,
real="1",
fm_step="0",
current_memory_usage=1000,
)
await dispatch2._send(event_to_json(event))
await dispatch2.send(event_to_json(event))
# second dispatch endpoint client informs that forward model 1 is running
event = ForwardModelStepRunning(
ensemble=evaluator.ensemble.id_,
real="1",
fm_step="1",
current_memory_usage=1000,
)
await dispatch2._send(event_to_json(event))
await dispatch2.send(event_to_json(event))

final_snapshot = EnsembleSnapshot()

Expand Down Expand Up @@ -330,12 +330,12 @@ def check_if_all_fm_running(snapshot: EnsembleSnapshot) -> bool:
fm_step="0",
current_memory_usage=1000,
)
await dispatch2._send(event_to_json(event))
await dispatch2.send(event_to_json(event))
# second dispatch endpoint client informs that job 1 is failed
event = ForwardModelStepFailure(
ensemble=evaluator.ensemble.id_, real="1", fm_step="1", error_msg="error"
)
await dispatch2._send(event_to_json(event))
await dispatch2.send(event_to_json(event))

def check_if_final_snapshot_is_complete(final_snapshot: EnsembleSnapshot) -> bool:
try:
Expand Down Expand Up @@ -408,31 +408,31 @@ async def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_e
fm_step="0",
current_memory_usage=1000,
)
await dispatch1._send(event_to_json(event))
await dispatch1.send(event_to_json(event))
# second dispatch endpoint client informs that real 1 fm 0 is running
event = ForwardModelStepRunning(
ensemble=evaluator.ensemble.id_,
real="1",
fm_step="0",
current_memory_usage=1000,
)
await dispatch2._send(event_to_json(event))
await dispatch2.send(event_to_json(event))
# second dispatch endpoint client informs that real 1 fm 0 is done
event = ForwardModelStepSuccess(
ensemble=evaluator.ensemble.id_,
real="1",
fm_step="0",
current_memory_usage=1000,
)
await dispatch2._send(event_to_json(event))
await dispatch2.send(event_to_json(event))
# second dispatch endpoint client informs that real 1 fm 1 is failed
event = ForwardModelStepFailure(
ensemble=evaluator.ensemble.id_,
real="1",
fm_step="1",
error_msg="error",
)
await dispatch2._send(event_to_json(event))
await dispatch2.send(event_to_json(event))

event = await anext(events)
snapshot = EnsembleSnapshot.from_nested_dict(event.snapshot)
Expand Down Expand Up @@ -496,17 +496,17 @@ async def test_ensure_multi_level_events_in_order(evaluator_to_use):
assert type(snapshot_event) is EESnapshot
async with Client(url + "/dispatch", cert=cert, token=token) as dispatch:
event = EnsembleStarted(ensemble=evaluator.ensemble.id_)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))
event = RealizationSuccess(
ensemble=evaluator.ensemble.id_, real="0", queue_event_type=""
)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))
event = RealizationSuccess(
ensemble=evaluator.ensemble.id_, real="1", queue_event_type=""
)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))
event = EnsembleSucceeded(ensemble=evaluator.ensemble.id_)
await dispatch._send(event_to_json(event))
await dispatch.send(event_to_json(event))

await monitor.signal_done()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


@pytest.mark.usefixtures("use_tmpdir")
def test_run_one_fm_step_with_an_integer_arg_is_actually_a_fractional():
async def test_run_one_fm_step_with_an_integer_arg_is_actually_a_fractional():
fm_step_1 = {
"name": "FM_STEP_1",
"executable": "echo",
Expand All @@ -20,16 +20,16 @@ def test_run_one_fm_step_with_an_integer_arg_is_actually_a_fractional():
data = {"jobList": [fm_step_1]}

runner = ForwardModelRunner(data)
statuses = list(runner.run([]))
statuses = [status async for status in runner.run([])]
starts = [e for e in statuses if isinstance(e, Start)]

assert len(starts) == 1, "There should be 1 start message"
assert not starts[0].success(), "fm_step should not start with success"


@pytest.mark.usefixtures("use_tmpdir")
def test_run_given_one_fm_step_with_missing_file_and_one_file_present():
with open("a_file", "w", encoding="utf-8") as f:
async def test_run_given_one_fm_step_with_missing_file_and_one_file_present():
with open("a_file", "w", encoding="utf-8") as f: # noqa: ASYNC230
f.write("Hello")

executable = "echo"
Expand Down Expand Up @@ -62,7 +62,7 @@ def test_run_given_one_fm_step_with_missing_file_and_one_file_present():

runner = ForwardModelRunner(data)

statuses = list(runner.run([]))
statuses = [status async for status in runner.run([])]

starts = [e for e in statuses if isinstance(e, Start)]
assert len(starts) == 2, "There should be 2 start messages"
Expand Down
5 changes: 3 additions & 2 deletions tests/ert/unit_tests/simulator/test_batch_sim.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import sys
import time
Expand Down Expand Up @@ -491,7 +492,7 @@ def assertContextStatusOddFailures(batch_ctx: BatchContext, final_state_only=Fal


@pytest.mark.integration_test
def test_batch_ctx_status_failing_jobs(setup_case, storage):
async def test_batch_ctx_status_failing_jobs(setup_case, storage):
ert_config = setup_case("batch_sim", "batch_sim_sleep_and_fail.ert")

external_parameters = {
Expand All @@ -515,6 +516,6 @@ def test_batch_ctx_status_failing_jobs(setup_case, storage):
batch_ctx = rsim.start("case_name", ensembles)
while batch_ctx.running():
assertContextStatusOddFailures(batch_ctx)
time.sleep(1)
await asyncio.sleep(1)

assertContextStatusOddFailures(batch_ctx, final_state_only=True)

0 comments on commit 84e389a

Please sign in to comment.