Skip to content

Commit

Permalink
DATAUP-565 Throw it into a thread (#419)
Browse files Browse the repository at this point in the history
* Throw it into a thread

* Throw it into a thread

Co-authored-by: bio-boris <[email protected]>
  • Loading branch information
bio-boris and bio-boris authored Aug 27, 2021
1 parent 77231e1 commit 96b6f69
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 14 deletions.
39 changes: 25 additions & 14 deletions lib/execution_engine2/sdk/EE2Runjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import os
import threading
import time
from collections import Counter
from enum import Enum
Expand Down Expand Up @@ -264,20 +265,7 @@ def _prepare_to_run(self, params, concierge_params=None) -> JobSubmissionParamet
)
return self._generate_job_submission_params(job_id, params)

def _run_multiple(self, runjob_params):
"""
Get the job records, bulk save them, then submit to condor.
If any condor submission fails, abort all of the jobs
:return:
"""
# Save records to db
job_records = []
for runjob_param in runjob_params:
job_records.append(
self._init_job_rec(self.sdkmr.get_user_id(), runjob_param, save=False)
)
job_ids = self.sdkmr.save_jobs(job_records)

def _submit_multiple_wrapper(self, job_ids: list, runjob_params: List[Dict]):
# Generate job submission params
job_submission_params = []
for i, job_id in enumerate(job_ids):
Expand All @@ -301,6 +289,29 @@ def _run_multiple(self, runjob_params):
self._abort_multiple_jobs(job_ids)
raise e

def _run_multiple(self, runjob_params: List[Dict]):
"""
Get the job records, bulk save them, then submit to condor.
If any condor submission fails, abort all of the jobs
:return:
"""
# Save records to db
job_records = []
for runjob_param in runjob_params:
job_records.append(
self._init_job_rec(self.sdkmr.get_user_id(), runjob_param, save=False)
)
job_ids = self.sdkmr.save_jobs(job_records)

# Start up job submission thread
# For testing, mock this out and check to see it is called with these params?
threading.Thread(
target=self._submit_multiple_wrapper,
kwargs={"runjob_params": runjob_params, "job_ids": job_ids},
daemon=True,
).start()
return job_ids

def _update_to_queued_multiple(self, job_ids, scheduler_ids):
"""
This is called during job submission. If a job is terminated during job submission,
Expand Down
12 changes: 12 additions & 0 deletions test/tests_for_integration/api_to_db_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,10 @@ def test_run_job_batch(ee2_port, ws_controller, mongo_client):
}
ee2 = ee2client(f"http://localhost:{ee2_port}", token=TOKEN_NO_ADMIN)
ret = ee2.run_job_batch([job1_params, job2_params], job_batch_params)

# May need to increase sleep if thread takes too long
time.sleep(0.1)

batch_id = ret["batch_id"]
job_id_1, job_id_2 = ret["child_job_ids"]

Expand Down Expand Up @@ -1417,6 +1421,10 @@ def test_run_job_batch_with_no_batch_wsid(ee2_port, ws_controller, mongo_client)
}
ee2 = ee2client(f"http://localhost:{ee2_port}", token=TOKEN_NO_ADMIN)
ret = ee2.run_job_batch([job1_params, job2_params], job_batch_params)

# May need to increase sleep if thread takes too long
time.sleep(0.1)

batch_id = ret["batch_id"]
job_id_1, job_id_2 = ret["child_job_ids"]

Expand Down Expand Up @@ -1614,6 +1622,10 @@ def test_run_job_batch_as_admin_with_job_reqs(ee2_port, ws_controller, mongo_cli
job_batch_params = {"wsid": job_batch_wsid, "as_admin": "foo"}
ee2 = ee2client(f"http://localhost:{ee2_port}", token=TOKEN_WRITE_ADMIN)
ret = ee2.run_job_batch([job1_params, job2_params], job_batch_params)

# May need to increase sleep if thread takes too long
time.sleep(0.1)

batch_id = ret["batch_id"]
job_id_1, job_id_2 = ret["child_job_ids"]

Expand Down
7 changes: 7 additions & 0 deletions test/tests_for_sdkmr/EE2Runjob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Incomplete by a long way. Will add more unit tests as they come up.

import copy
import time
from logging import Logger
from typing import List, Dict, Any
from unittest.mock import create_autospec, call
Expand Down Expand Up @@ -1050,6 +1051,8 @@ def test_run_job_batch_with_cancellation_during_submit():
"batch_id": _JOB_ID,
"child_job_ids": [_JOB_ID_1, _JOB_ID_2],
}
# May need to increase sleep if thread takes too long
time.sleep(0.1)

# check mocks called as expected. The order here is the order that they're called in the code.
mocks[WorkspaceAuth].can_write.assert_called_once_with(parent_wsid)
Expand Down Expand Up @@ -1143,6 +1146,8 @@ def test_run_job_batch_with_parent_job_wsid():
"batch_id": _JOB_ID,
"child_job_ids": [_JOB_ID_1, _JOB_ID_2],
}
# May need to increase sleep if thread takes too long
time.sleep(0.1)

# check mocks called as expected. The order here is the order that they're called in the code.
mocks[WorkspaceAuth].can_write.assert_called_once_with(parent_wsid)
Expand Down Expand Up @@ -1249,6 +1254,8 @@ def test_run_job_batch_as_admin_with_job_requirements():
"batch_id": _JOB_ID,
"child_job_ids": [_JOB_ID_1, _JOB_ID_2],
}
# May need to increase sleep if thread takes too long
time.sleep(0.1)

# check mocks called as expected. The order here is the order that they're called in the code.
sdkmr.check_as_admin.assert_called_once_with(JobPermissions.WRITE)
Expand Down

0 comments on commit 96b6f69

Please sign in to comment.