Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bio-boris committed Nov 4, 2021
1 parent b4bab18 commit 6f9e6f9
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 76 deletions.
6 changes: 3 additions & 3 deletions execution_engine2.spec
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@
Requires the user to keep track of the job states of the Status enum in the ee2 models file
If no status_list is provided, an exception is thrown.
*/
funcdef retry_batch_jobs(BatchRetryParams params) returns (list<RetryResult> retry_result) authentication required;
funcdef retry_batch_jobs_by_status(BatchRetryParams params) returns (list<RetryResult> retry_result) authentication required;


funcdef abandon_children(AbandonChildren params)
Expand Down Expand Up @@ -608,7 +608,7 @@

/*
batch_job_id: BATCH_ID to cancel
status_list: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of both
status_list: required filter of one or more of [created, estimating, queued, or running]
terminated_code: optional terminated code, default to terminated by user
as_admin: retry someone else's job in your namespace
@optional terminated_code
Expand All @@ -626,7 +626,7 @@
(Requires the user to keep track of the job states of the Status enum in the ee2 models file)
If no status_list is provided, an exception is thrown.
*/
funcdef cancel_batch_job(BatchCancelParams params) returns (list<job_id> job_ids) authentication required;
funcdef cancel_batch_jobs_by_status(BatchCancelParams params) returns (list<job_id> job_ids) authentication required;

/*
job_id - id of job running method
Expand Down
10 changes: 5 additions & 5 deletions lib/execution_engine2/db/MongoUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,15 @@ def eligible_for_retry(self, job: Job):
"""
Checks the job record to see if it has any retry_ids,
and if those retry_ids do not contain an ineligble job state
:param job:
:return:
:param job: Should be a child job of a BATCH job
"""
if job.retry_ids == []:

if not job.retry_ids:
return True
eligble_states = [Status.completed.value, Status.error.value]
valid_statuses = [Status.terminated.value, Status.error.value]
jobs = self.get_jobs(job_ids=job.retry_ids)
for job in jobs:
if job.status not in eligble_states:
if job.status not in valid_statuses:
return False
return True

Expand Down
2 changes: 1 addition & 1 deletion lib/execution_engine2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, msg=None, *args, **kwargs):


class InvalidStatusListException(ExecutionEngineValueError):
"""Invalid job state status provided"""
"""Invalid job status provided"""


class BatchTerminationException(ExecutionEngineException):
Expand Down
30 changes: 19 additions & 11 deletions lib/execution_engine2/sdk/EE2Runjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,9 @@ def retry(self, job_id: str, as_admin: bool = False) -> Dict[str, Optional[str]]
job_id=job_id, job=job, batch_job=batch_job, as_admin=as_admin
)

def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False):
def retry_jobs_in_batch_by_status(
self, job_id: str, status_list: list, as_admin: bool = False
):
"""
Retry jobs by status given a BATCH_ID
:param job_id: The batch job id to retry jobs for
Expand All @@ -774,19 +776,25 @@ def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False):
raise InvalidStatusListException(
f"Provide a list of status codes from {valid_statuses}."
)
for status in status_list:
if not self._retryable(status):
raise InvalidStatusListException(
f"Provided status {status} not retryable . Status not in {valid_statuses}"
)

invalid_statuses = [
status for status in status_list if not self._retryable(status)
]
if len(invalid_statuses):
raise InvalidStatusListException(
f"Provided status list contains {invalid_statuses}, which are not retryable. "
+ f"Status not in {valid_statuses}"
)

batch_job = self.sdkmr.get_job_with_permission(
job_id, JobPermissions.WRITE, as_admin=as_admin
)
if not batch_job.batch_job:
raise NotBatchJobException(f"{job_id} is not a batch job")
# Retry and Report
# Get jobs that do NOT have a retry_parent, i.e. only jobs that haven't been retried and retry_parents only
# Get jobs that
# do NOT have a retry_parent, i.e. only jobs that haven't been retried
# and jobs that have not retried retry_parents only
potentially_retryable_child_jobs = (
self.sdkmr.get_mongo_util().get_jobs_with_status(
job_ids=batch_job.child_jobs,
Expand All @@ -797,13 +805,13 @@ def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False):
# So we don't want to retry jobs that have retry jobs in progress,
# or a retry job that has already been successful
retryable_child_job_ids = []
for job in potentially_retryable_child_jobs:
if self.sdkmr.get_mongo_util().eligible_for_retry(job=job):
retryable_child_job_ids.append(str(job.id))
for child_job in potentially_retryable_child_jobs:
if self.sdkmr.get_mongo_util().eligible_for_retry(job=child_job):
retryable_child_job_ids.append(str(child_job.id))

if len(retryable_child_job_ids) == 0:
raise RetryFailureException(
f"No retryable jobs found with a state of {status_list}"
f"No retryable jobs found with a status in {status_list}"
)

return self.retry_multiple(job_ids=retryable_child_job_ids)
Expand Down
48 changes: 17 additions & 31 deletions lib/execution_engine2/sdk/EE2Status.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def handle_held_job(self, cluster_id, as_admin):
# There's probably a better way and a return type, but not really sure what I need yet
return json.loads(json.dumps(j.to_mongo().to_dict(), default=str))

def cancel_batch_job(
def cancel_jobs_in_batch_by_status(
self, job_id, status_list, terminated_code=None, as_admin=False
):
"""
Expand All @@ -111,11 +111,14 @@ def cancel_batch_job(
raise InvalidStatusListException(
f"Provide a list of valid job statuses from {valid_statuses}."
)
for status in status_list:
if status not in valid_statuses:
raise InvalidStatusListException(
f"Provided status {status} not in {valid_statuses}."
)

found_invalid_statuses = [
status for status in status_list if status not in valid_statuses
]
if len(found_invalid_statuses):
raise InvalidStatusListException(
f"Provided status list contains {found_invalid_statuses}, which are not cancelable. Status not in {valid_statuses}"
)

batch_job = self.sdkmr.get_job_with_permission(
job_id, JobPermissions.WRITE, as_admin=as_admin
Expand All @@ -125,31 +128,20 @@ def cancel_batch_job(

# Termination
terminated_ids = []
child_jobs = self.sdkmr.get_mongo_util().get_jobs_with_status(
job_ids=batch_job.child_jobs, status_list=status_list
child_job_ids = self.sdkmr.get_mongo_util().get_jobs_with_status(
job_ids=batch_job.child_jobs, status_list=status_list, only_job_ids=False
)
for job in child_jobs:
try:
self.cancel_job(
job=job, terminated_code=terminated_code, as_admin=as_admin
)
terminated_ids.append(job.id)
except Exception:
# Nothing to report, a job might have finished by now
pass

# Report
if len(terminated_ids) == 0:
raise BatchTerminationException(
f"{job_id} didn't have any valid child jobs to terminate"
)
for job in child_job_ids:
self.cancel_job(job=job, terminated_code=terminated_code, as_admin=as_admin)
terminated_ids.append(str(job.id))

return terminated_ids

def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False):
"""
Authorization Required: Ability to Read and Write to the Workspace
Terminates child jobs as well
Need to provide exactly one job id or a Job
:param job: Job Object to cancel
:param job_id: Job ID To cancel
:param terminated_code: Default Terminated By User
Expand All @@ -170,20 +162,14 @@ def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False
terminated_code = TerminatedCode.terminated_by_user.value

self.sdkmr.get_mongo_util().cancel_job(
job_id=job_id, terminated_code=terminated_code
job_id=job.id, terminated_code=terminated_code
)
for child_job_id in job.child_jobs:
self.cancel_job(
job_id=child_job_id,
terminated_code=TerminatedCode.terminated_by_batch_abort.value,
)

for child_job_id in job.child_jobs:
self.cancel_job(
job_id=child_job_id,
terminated_code=TerminatedCode.terminated_by_batch_abort.value,
)

self.sdkmr.logger.debug(
f"About to cancel job in CONDOR using jobid {job_id} {job.scheduler_id}"
)
Expand Down
4 changes: 2 additions & 2 deletions lib/execution_engine2/sdk/SDKMethodRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ def retry(self, job_id, as_admin=False):

def retry_batch(self, job_id, status_list, as_admin=False):
"""Authorization Required Read/Write"""
return self.get_runjob().retry_batch(
return self.get_runjob().retry_jobs_in_batch_by_status(
job_id=job_id, as_admin=as_admin, status_list=status_list
)

Expand Down Expand Up @@ -402,7 +402,7 @@ def cancel_job(self, job_id, terminated_code=None, as_admin=False):

def cancel_batch_job(self, job_id, status_list, terminated_code, as_admin=False):
"""Authorization Required Read/Write"""
return self.get_jobs_status().cancel_batch_job(
return self.get_jobs_status().cancel_jobs_in_batch_by_status(
job_id=job_id,
terminated_code=terminated_code,
status_list=status_list,
Expand Down
43 changes: 20 additions & 23 deletions test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,14 +591,13 @@ def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock):
== "Provide a list of status codes from ['terminated', 'error']."
)

for status_list in [["running"], ["apple"], [None]]:
status = status_list[0]
for status_list in [["running"], ["apple", "kertuffl"], [None]]:
expected_exception = InvalidStatusListException
with self.assertRaises(expected_exception) as e:
runner.retry_batch(job_id=batch_job_id, status_list=status_list)
assert (
e.exception.args[0]
== f"Provided status {status} not retryable . Status not in ['terminated', 'error']"
== f"Provided status list contains {status_list}, which are not retryable. Status not in ['terminated', 'error']"
)

with self.assertRaises(NotBatchJobException) as e:
Expand All @@ -610,7 +609,7 @@ def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock):
runner.retry_batch(job_id=batch_job_id, status_list=status_list)
assert (
e.exception.args[0]
== f"No retryable jobs found with a state of {status_list}"
== f"No retryable jobs found with a status in {status_list}"
)

for job_id in child_jobs:
Expand Down Expand Up @@ -662,15 +661,12 @@ def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock):
for c in oc:
combos_list.append(list(c))

possible_errors = []
for item in [
Status.error.value,
Status.terminated.value,
Status.completed.value,
]:
possible_errors.append(
f"Provided status {item} not in ['created', 'queued', 'estimating', 'running']"
)
valid_statuses = [
Status.created.value,
Status.queued.value,
Status.estimating.value,
Status.running.value,
]

for status_list in combos_list:
failable = False
Expand All @@ -687,17 +683,18 @@ def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock):
runner.cancel_batch_job(
job_id=batch_job_id, status_list=status_list, terminated_code=0
)
assert e.exception.args[0] in [possible_errors]
else:
with self.assertRaises(BatchTerminationException) as e:
runner.cancel_batch_job(
job_id=batch_job_id, status_list=status_list, terminated_code=0
)
assert (
e.exception.args[0]
== f"{batch_job_id} didn't have any valid child jobs to terminate"
)

for item in valid_statuses:
try:
del status_list[status_list.index(item)]
except Exception:
pass

expected = f"Provided status list contains {status_list}, which are not cancelable. Status not in {valid_statuses}"

assert e.exception.args[0] == expected

# Assert cancelled jobs?
job_status = runner.check_job_batch(batch_id=batch_job_id)
print(job_status)

Expand Down

0 comments on commit 6f9e6f9

Please sign in to comment.