Skip to content

Commit

Permalink
Add endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
bio-boris committed Sep 2, 2021
1 parent 9adfa50 commit 2d3f5ee
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 20 deletions.
9 changes: 5 additions & 4 deletions execution_engine2.spec
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@
Requires the user to keep track of the job states of the Status enum in the ee2 models file
If no status_list is provided, an exception is thrown.
*/
funcdef retry_batch_jobs(BatchRetry params) returns (list<RetryResult> retry_result) authentication required;
funcdef retry_batch_jobs(BatchRetryParams params) returns (list<RetryResult> retry_result) authentication required;


funcdef abandon_children(AbandonChildren params)
Expand Down Expand Up @@ -609,11 +609,14 @@
/*
batch_job_id: BATCH_ID to cancel
status_filter: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of both
terminated_code: optional terminated code, default to terminated by user
as_admin: retry someone else's job in your namespace
@optional terminated_code
*/
typedef structure {
job_id batch_job_id;
list<job_status> status_filter;
int terminated_code;
boolean as_admin;
} BatchCancelParams;

Expand All @@ -623,9 +626,7 @@
(Requires the user to keep track of the job states of the Status enum in the ee2 models file)
If no status_filter is provided, an exception is thrown.
*/
funcdef cancel_batch_job(BatchCancelParams params) returns () authentication required;


funcdef cancel_batch_job(BatchCancelParams params) returns (list<job_id> job_ids) authentication required;

/*
job_id - id of job running method
Expand Down
14 changes: 14 additions & 0 deletions lib/execution_engine2/db/MongoUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ def get_job(self, job_id=None, exclude_fields=None) -> Job:

return job

def get_jobs_with_status(
self, job_ids: List[str], status_list: List[str], only_job_ids: bool = False
) -> List[Job]:
if not (job_ids and isinstance(job_ids, list)):
raise ValueError("Please provide a non empty list of job ids")

if not (status_list and isinstance(job_ids, list)):
raise ValueError("Please provide a non empty list of job statuses")

with self.mongo_engine_connection():
if only_job_ids:
return Job.objects(id__in=job_ids, status__in=status_list).only("_id")
return Job.objects(id__in=job_ids, status__in=status_list)

def get_jobs(
self, job_ids=None, exclude_fields=None, sort_id_ascending=None
) -> List[Job]:
Expand Down
8 changes: 8 additions & 0 deletions lib/execution_engine2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@ def __init__(self, msg=None, *args, **kwargs):
super().__init__(msg or self.__doc__, *args, **kwargs)


class InvalidStatusListException(ExecutionEngineValueError):
"""Invalid job state status provided"""


class IncorrectParamsException(ExecutionEngineValueError):
"""Wrong parameters were provided"""


class NotBatchJobException(ExecutionEngineValueError):
"""Requested job is not a batch job"""


class InvalidParameterForBatch(ExecutionEngineValueError):
"""Workspace ids are not allowed in RunJobParams in Batch Mode"""

Expand Down
46 changes: 37 additions & 9 deletions lib/execution_engine2/execution_engine2Impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class execution_engine2:
######################################### noqa
VERSION = "0.0.5"
GIT_URL = "[email protected]:kbase/execution_engine2.git"
GIT_COMMIT_HASH = "ac8da691d9012571b51995a68fb826c8ae32e146"
GIT_COMMIT_HASH = "9adfa50d1ea628eb3907038c9993c6a52fe34fc7"

#BEGIN_CLASS_HEADER
MONGO_COLLECTION = "jobs"
Expand Down Expand Up @@ -360,7 +360,7 @@ def run_job_batch(self, ctx, params, batch_params):
#BEGIN run_job_batch
mr = SDKMethodRunner(
user_clients=self.gen_cfg.get_user_clients(ctx),
clients = self.clients,
clients=self.clients,
job_permission_cache=self.job_permission_cache,
admin_permissions_cache=self.admin_permissions_cache
)
Expand Down Expand Up @@ -402,7 +402,7 @@ def retry_job(self, ctx, params):
#BEGIN retry_job
mr = SDKMethodRunner(
user_clients=self.gen_cfg.get_user_clients(ctx),
clients = self.clients,
clients=self.clients,
job_permission_cache=self.job_permission_cache,
admin_permissions_cache=self.admin_permissions_cache
)
Expand Down Expand Up @@ -473,6 +473,15 @@ def retry_batch_jobs(self, ctx, params):
# ctx is the context object
# return variables are: retry_result
#BEGIN retry_batch_jobs
mr = SDKMethodRunner(
user_clients=self.gen_cfg.get_user_clients(ctx),
clients=self.clients,
job_permission_cache=self.job_permission_cache,
admin_permissions_cache=self.admin_permissions_cache
)
retry_result = mr.retry_batch(job_id=params.get('job_id'),
job_status=params.get('status_filter'),
as_admin=params.get('as_admin'))
#END retry_batch_jobs

# At some point might do deeper type checking...
Expand Down Expand Up @@ -1682,16 +1691,35 @@ def cancel_batch_job(self, ctx, params):
:param params: instance of type "BatchCancelParams" (batch_job_id:
BATCH_ID to cancel status_filter: optional filter of either
'terminated' or 'error'. Not setting this results in cancel of
both as_admin: retry someone else's job in your namespace) ->
structure: parameter "batch_job_id" of type "job_id" (A job id.),
parameter "status_filter" of list of type "job_status" (A job
state's job status.), parameter "as_admin" of type "boolean"
(@range [0,1])
both terminated_code: optional terminated code, default to
terminated by user as_admin: retry someone else's job in your
namespace @optional terminated_code) -> structure: parameter
"batch_job_id" of type "job_id" (A job id.), parameter
"status_filter" of list of type "job_status" (A job state's job
status.), parameter "terminated_code" of Long, parameter
"as_admin" of type "boolean" (@range [0,1])
:returns: instance of list of type "job_id" (A job id.)
"""
# ctx is the context object
# return variables are: job_ids
#BEGIN cancel_batch_job
mr = SDKMethodRunner(
user_clients=self.gen_cfg.get_user_clients(ctx),
clients=self.clients,
job_permission_cache=self.job_permission_cache,
admin_permissions_cache=self.admin_permissions_cache
)
returnVal = mr.cancel_batch_job(job_id=params.get('job_id'),
status_list=params.get('status_filter'),
as_admin=params.get('as_admin'))
#END cancel_batch_job
pass

# At some point might do deeper type checking...
if not isinstance(job_ids, list):
raise ValueError('Method cancel_batch_job return value ' +
'job_ids is not type list as required.')
# return the results
return [job_ids]

def check_job_canceled(self, ctx, params):
"""
Expand Down
35 changes: 34 additions & 1 deletion lib/execution_engine2/sdk/EE2Runjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
CannotRetryJob,
RetryFailureException,
InvalidParameterForBatch,
InvalidStatusListException,
NotBatchJobException,
)
from execution_engine2.sdk.EE2Constants import CONCIERGE_CLIENTGROUP
from execution_engine2.sdk.job_submission_parameters import (
Expand Down Expand Up @@ -744,7 +746,7 @@ def _retry(self, job_id: str, job: Job, batch_job: Job, as_admin: bool = False):
# to make sure the retried job is correctly submitted? Or save that for a unit test?
return {"job_id": job_id, "retry_id": retry_job_id}

def retry(self, job_id: str, as_admin=False) -> Dict[str, Optional[str]]:
def retry(self, job_id: str, as_admin: bool = False) -> Dict[str, Optional[str]]:
"""
#TODO Add new job requirements/cgroups as an optional param
:param job_id: The main job to retry
Expand All @@ -758,6 +760,37 @@ def retry(self, job_id: str, as_admin=False) -> Dict[str, Optional[str]]:
job_id=job_id, job=job, batch_job=batch_job, as_admin=as_admin
)

def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False):
"""
Retry jobs by status given a BATCH_ID
:param job_id: The batch job id to retry jobs for
:param status_list: The state of jobs in that batch to retry
:param as_admin: Retry jobs for others
:return: A list of job ids that are retried from the batch
"""
valid_statuses = [Status.terminated.value, Status.error.value]
# Validation
if not status_list:
raise InvalidStatusListException(
f"Provide a list of status codes from {valid_statuses}."
)
for status in status_list:
if not self._retryable(status):
raise InvalidStatusListException(
f"Provided status {status} not retryable . Status not in {valid_statuses}"
)

batch_job = self.sdkmr.get_job_with_permission(
job_id, JobPermissions.WRITE, as_admin=as_admin
)
if not batch_job.batch_job:
raise NotBatchJobException(f"{job_id} is not a batch job")
# Retry and Report
retryable_child_job_ids = self.sdkmr.get_mongo_util().get_jobs_with_status(
job_ids=batch_job.child_jobs, status_list=status_list, only_job_ids=True
)
return self.retry_multiple(job_ids=retryable_child_job_ids)

def retry_multiple(
self, job_ids, as_admin=False
) -> List[Dict[str, Union[str, Any]]]:
Expand Down
74 changes: 68 additions & 6 deletions lib/execution_engine2/sdk/EE2Status.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from execution_engine2.exceptions import (
InvalidStatusTransitionException,
ChildrenNotFoundError,
InvalidStatusListException,
NotBatchJobException,
)
from execution_engine2.sdk.EE2Constants import JobError
from lib.execution_engine2.authorization.authstrategy import can_read_jobs
Expand Down Expand Up @@ -85,19 +87,79 @@ def handle_held_job(self, cluster_id, as_admin):
# There's probably a better way and a return type, but not really sure what I need yet
return json.loads(json.dumps(j.to_mongo().to_dict(), default=str))

def cancel_job(self, job_id, terminated_code=None, as_admin=False):
def cancel_batch_job(
self, job_id, status_list, terminated_code=None, as_admin=False
):
"""
Terminate jobs by status given a BATCH_ID
:param job_id: A batch job id to terminate child jobs of
:param status_list: A list of statuses to terminate the child jobs in
:param terminated_code: The terminated code, default to TerminatedCode.terminated_by_user.value
:param as_admin: Terminate jobs for others
:return: A list of job ids that were successfully terminated
"""
# Validation
valid_statuses = [
Status.created.value,
Status.queued.value,
Status.estimating.value,
Status.running.value,
]
if not status_list:
raise InvalidStatusListException(
f"Provide a list of status codes from {valid_statuses}."
)
for status in status_list:
if status not in valid_statuses:
raise InvalidStatusListException(
f"Provided status {status} not in {valid_statuses}."
)

batch_job = self.sdkmr.get_job_with_permission(
job_id, JobPermissions.WRITE, as_admin=as_admin
)
if not batch_job.batch_job:
raise NotBatchJobException(f"{job_id} is not a batch job")

# Termination
terminated_ids = []
child_jobs = self.sdkmr.get_mongo_util().get_jobs_with_status(
job_ids=batch_job.child_jobs, status_list=status_list
)
for job in child_jobs:
try:
self.cancel_job(
job=job, terminated_code=terminated_code, as_admin=as_admin
)
terminated_ids.append(job.id)
except Exception:
# Nothing to report, a job might have finished by now
pass

# Report
if len(terminated_ids) == 0:
raise Exception(f"{job_id} didn't have any valid child jobs to terminate")
return terminated_ids

def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False):
"""
Authorization Required: Ability to Read and Write to the Workspace
Default for terminated code is Terminated By User
Terminates child jobs as well
:param job_id: Job ID To cancel
:param terminated_code:
:param terminated_code: Default Terminated By User
:param as_admin: Cancel the job for a different user
"""
# Is it inefficient to get the job twice? Is it cached?
if (not job_id and not job) or (job_id and job):
raise Exception(
"Programming Error: Need to provide exactly one job id or a job object"
)

job = self.sdkmr.get_job_with_permission(
job_id, JobPermissions.WRITE, as_admin=as_admin
)
if job_id:
job = self.sdkmr.get_job_with_permission(
job_id, JobPermissions.WRITE, as_admin=as_admin
)

if terminated_code is None:
terminated_code = TerminatedCode.terminated_by_user.value
Expand Down
15 changes: 15 additions & 0 deletions lib/execution_engine2/sdk/SDKMethodRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ def retry(self, job_id, as_admin=False):
"""Authorization Required Read/Write"""
return self.get_runjob().retry(job_id=job_id, as_admin=as_admin)

def retry_batch(self, job_id, status_list, as_admin=False):
"""Authorization Required Read/Write"""
return self.get_runjob().retry_batch(
job_id=job_id, as_admin=as_admin, status_list=status_list
)

def run_job(self, params, as_admin=False):
"""Authorization Required Read/Write"""
return self.get_runjob().run(params=params, as_admin=as_admin)
Expand Down Expand Up @@ -394,6 +400,15 @@ def cancel_job(self, job_id, terminated_code=None, as_admin=False):
job_id=job_id, terminated_code=terminated_code, as_admin=as_admin
)

def cancel_batch_job(self, job_id, status_list, terminated_code, as_admin=False):
"""Authorization Required Read/Write"""
return self.get_jobs_status().cancel_batch_job(
job_id=job_id,
terminated_code=terminated_code,
status_list=status_list,
as_admin=as_admin,
)

def handle_held_job(self, cluster_id):
"""Authorization Required Read/Write"""
if self.check_as_admin(requested_perm=JobPermissions.WRITE):
Expand Down

0 comments on commit 2d3f5ee

Please sign in to comment.