From 365804409c06ad4673be9a47a4e3b7a67985bd7b Mon Sep 17 00:00:00 2001 From: Michael Wurster Date: Thu, 11 Apr 2024 13:54:47 +0200 Subject: [PATCH] fix: refactor job executor --- src/TaskExecutor.py | 30 ------------------ src/app.py | 29 ++++++++++------- src/execute_user_code.py | 2 ++ src/execution_manager.py | 60 ----------------------------------- src/helpers/date_formatter.py | 6 ++-- src/job_executor.py | 48 ++++++++++++++++++++++++++++ src/job_manager.py | 30 ------------------ src/job_state.py | 49 ++++++++++++++++++++++++++++ src/model/job.py | 6 ++-- user_code/src/program.py | 2 +- 10 files changed, 125 insertions(+), 137 deletions(-) delete mode 100644 src/TaskExecutor.py delete mode 100644 src/execution_manager.py create mode 100644 src/job_executor.py delete mode 100644 src/job_manager.py create mode 100644 src/job_state.py diff --git a/src/TaskExecutor.py b/src/TaskExecutor.py deleted file mode 100644 index 9ea9365..0000000 --- a/src/TaskExecutor.py +++ /dev/null @@ -1,30 +0,0 @@ -import concurrent.futures - -# The TaskExecutor is used as a container of future objects. -class TaskExecutor(concurrent.futures.ThreadPoolExecutor): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._futures = {} - - """ - Submits an async task for execution. - Parameters: - fn (callable): The function to execute. - params (dict): Parameters to pass to the function. - data (dict): Additional data to pass to the function. - key (str): A unique key to identify the task. - is_done_callback (callable): A callback function that will be executed after the task is done. - Returns: - Void - """ - def submit(self, fn, params, data, entry_point, key, is_done_callback): - future = super().submit(fn, params, data, entry_point) - future.add_done_callback(is_done_callback) - self._futures[key] = future - - def get(self, key): - return self._futures.get(key) - - def delete(self, key): - if self._futures.get(key): - del self._futures[key] diff --git a/src/app.py b/src/app.py index 90b0fab..f8d3de0 100644 --- a/src/app.py +++ b/src/app.py @@ -1,15 +1,19 @@ import logging import os import sys +import time +import uuid from typing import List -from fastapi import FastAPI, Path, Query +from fastapi import FastAPI, Path, Query, BackgroundTasks from loguru import logger -from src.execution_manager import create_execution, delete_execution, get_execution_result, get_execution_status +from src.helpers.date_formatter import format_timestamp from src.helpers.logging import LogHandler +from src.job_executor import JobExecutor from src.model.execution_input import ExecutionInput from src.model.execution_output import ExecutionOutput +from src.model.execution_status import ExecutionStatus from src.model.health_check import HealthCheck from src.model.job import Job @@ -23,6 +27,8 @@ logging.getLogger().setLevel(logging_level) logger.configure(handlers=[{"sink": sys.stdout, "level": logging_level}]) +job_executor = JobExecutor() + @app.get('/', tags=["Status API"], @@ -35,22 +41,24 @@ def health_check() -> HealthCheck: tags=["Service API"], summary="Asynchronous execution of the service", status_code=201) -async def create(execution_input: ExecutionInput) -> Job: - return create_execution(execution_input) +async def create_job(execution_input: ExecutionInput, background_tasks: BackgroundTasks) -> Job: + job_id = str(uuid.uuid4()) + background_tasks.add_task(job_executor.create_job, job_id, execution_input) + return Job(id=job_id, status=ExecutionStatus.PENDING, created_at=format_timestamp(time.time())) @app.get('/{id}', tags=["Service API"], summary="Check execution status") -def get_status(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> Job: - return get_execution_status(job_id) +def get_job_status(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> Job: + return job_executor.get_job_status(job_id) @app.get('/{id}/result', tags=["Service API"], summary="Get the result of an execution") -def get_result(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> ExecutionOutput: - return get_execution_result(job_id) +def get_job_result(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> ExecutionOutput: + return job_executor.get_job_result(job_id) @app.get('/{id}/interim-results', @@ -68,6 +76,5 @@ def get_interim_results( @app.put('/{id}/cancel', tags=["Service API"], summary="Cancel an execution") -def delete(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> Job: - delete_execution(job_id) - return get_execution_status(job_id) +def cancel_job(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> Job: + return job_executor.cancel_job(job_id) diff --git a/src/execute_user_code.py b/src/execute_user_code.py index aabb7ff..f9698c0 100644 --- a/src/execute_user_code.py +++ b/src/execute_user_code.py @@ -1,6 +1,8 @@ import traceback from typing import Optional + from loguru import logger + from src.user_code_runner import run_user_code diff --git a/src/execution_manager.py b/src/execution_manager.py deleted file mode 100644 index c556bc2..0000000 --- a/src/execution_manager.py +++ /dev/null @@ -1,60 +0,0 @@ -import os -import traceback -import uuid -from typing import Optional - -from fastapi import HTTPException -from loguru import logger - -from src.TaskExecutor import TaskExecutor -from src.execute_user_code import execute_user_code -from src.job_manager import create_job, delete_job, update_job_by_status -from src.model.execution_input import ExecutionInput -from src.model.execution_status import ExecutionStatus -from src.user_code_runner import run_user_code - -executor = TaskExecutor() - - -def update_job(key, future): - if not future: - raise HTTPException(status_code=404, detail="Execution does not exist") - - if not future.done(): - executionStatus = ExecutionStatus.RUNNING - elif future.cancelled(): - executionStatus = ExecutionStatus.CANCELLED - elif future.exception() is not None: - executionStatus = ExecutionStatus.FAILED - else: - executionStatus = ExecutionStatus.SUCCEEDED - - return update_job_by_status(key, executionStatus) - - -def create_execution(input: ExecutionInput): - id = str(uuid.uuid4()) - entry_point = os.environ.get("ENTRY_POINT", "user_code.src.program:run") - executor.submit(execute_user_code, input.data, input.params, entry_point, id, lambda f: update_job(id, f)) - return create_job(id) - - -def get_execution_status(id): - future = executor.get(id) - return update_job(id, future) - - -def get_execution_result(id): - future = executor.get(id) - if future is None: - raise HTTPException(status_code=404, detail="Execution does not exist") - - if future.done(): - response = future.result() - return response - return (f"Current execution status is: {ExecutionStatus.RUNNING}") - - -def delete_execution(id): - delete_job(id) - executor.delete(id) diff --git a/src/helpers/date_formatter.py b/src/helpers/date_formatter.py index d0137d7..8da74e3 100644 --- a/src/helpers/date_formatter.py +++ b/src/helpers/date_formatter.py @@ -3,9 +3,11 @@ from fastapi import HTTPException -def format_timestamp(timestamp_float): +def format_timestamp(timestamp: float | None): + if timestamp is None: + return None try: - timestamp_datetime = datetime.fromtimestamp(timestamp_float) + timestamp_datetime = datetime.fromtimestamp(timestamp) return timestamp_datetime.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: raise HTTPException(status_code=500, detail=f"Error formatting timestamp: {str(e)}") diff --git a/src/job_executor.py b/src/job_executor.py new file mode 100644 index 0000000..d2d91fe --- /dev/null +++ b/src/job_executor.py @@ -0,0 +1,48 @@ +import os +from concurrent.futures import ThreadPoolExecutor + +from fastapi import HTTPException + +from src.execute_user_code import execute_user_code +from src.job_state import JobState +from src.model.execution_input import ExecutionInput +from src.model.job import Job + + +class JobExecutor: + def __init__(self): + self.jobs = {} + self.executor = ThreadPoolExecutor(max_workers=3) + + def create_job(self, job_id: str, execution_input: ExecutionInput) -> None: + entry_point = os.environ.get("ENTRY_POINT", "user_code.src.program:run") + future = self.executor.submit(execute_user_code, execution_input.data, execution_input.params, entry_point) + + job = JobState(job_id, future) + self.jobs[job_id] = job + + def get_job_status(self, job_id: str) -> Job: + job = self.jobs.get(job_id) + if job is None: + raise HTTPException(status_code=404, detail="Not found") + + return job.get_status() + + def get_job_result(self, job_id: str): + job = self.jobs.get(job_id) + if job is None: + raise HTTPException(status_code=404, detail="Not found") + + result = job.get_result() + if result is None: + raise HTTPException(status_code=404, detail="Not found") + + return result + + def cancel_job(self, job_id: str): + job = self.jobs.get(job_id) + if job is None: + raise HTTPException(status_code=404, detail="Not found") + + job.cancel() + return job.get_status() diff --git a/src/job_manager.py b/src/job_manager.py deleted file mode 100644 index 87b9d69..0000000 --- a/src/job_manager.py +++ /dev/null @@ -1,30 +0,0 @@ -from fastapi import HTTPException -from src.model.execution_status import ExecutionStatus -from src.helpers.date_formatter import format_timestamp -from src.model.job import Job -import time - -jobs = {} - -def create_job(job_id): - job = Job(id = job_id, status = ExecutionStatus.PENDING, createdAt = format_timestamp(time.time()), startedAt = format_timestamp(time.time())) - jobs[job_id] = job - return job - -def update_job_by_status(id, status: ExecutionStatus): - job = jobs.get(id) - if not job: - raise HTTPException(status_code=404, detail="Execution does not exist") - - job.status = status - - if not status == ExecutionStatus.RUNNING: - job.endedAt = format_timestamp(time.time()) - - jobs[id] = job - return job - -def delete_job(id): - if not jobs.get(id): - raise HTTPException(status_code=404, detail="Execution does not exist") - del jobs[id] diff --git a/src/job_state.py b/src/job_state.py new file mode 100644 index 0000000..ed232e7 --- /dev/null +++ b/src/job_state.py @@ -0,0 +1,49 @@ +import time +from concurrent.futures import Future + +from src.helpers.date_formatter import format_timestamp +from src.model.execution_output import ExecutionOutput +from src.model.execution_status import ExecutionStatus +from src.model.job import Job + + +class JobState: + def __init__(self, job_id: str, future: Future): + self.job_id = job_id + self.created_at = time.time() + self.started_at = time.time() + self.ended_at = None + self.future = future + self.future.add_done_callback(lambda f: self.__set_ended_at()) + + def has_finished(self): + return self.future.done() + + def get_status(self) -> Job: + if self.future.done(): + status = ExecutionStatus.SUCCEEDED + elif self.future.cancelled(): + status = ExecutionStatus.CANCELLED + elif self.future.exception() is not None: + status = ExecutionStatus.FAILED + else: + status = ExecutionStatus.RUNNING + + return Job( + id=self.job_id, + status=status, + created_at=format_timestamp(self.created_at), + started_at=format_timestamp(self.started_at), + ended_at=format_timestamp(self.ended_at), + ) + + def get_result(self) -> ExecutionOutput | None: + if self.has_finished(): + return self.future.result() + return None + + def cancel(self): + self.future.cancel() + + def __set_ended_at(self): + self.ended_at = time.time() diff --git a/src/model/job.py b/src/model/job.py index 8b62817..e773d17 100644 --- a/src/model/job.py +++ b/src/model/job.py @@ -6,9 +6,9 @@ class Job(BaseModel): id: str = Field(examples=["87cb778e-ac43-11ec-b909-0242ac120002"]) status: ExecutionStatus = Field(examples=["SUCCEEDED"]) - createdAt: str = Field(examples=["2022-04-01 12:00:00"]) - startedAt: str = Field(None, examples=["2022-04-01 12:00:00"]) - endedAt: str = Field(None, examples=["2022-04-01 12:00:00"]) + created_at: str = Field(examples=["2022-04-01 12:00:00"]) + started_at: str = Field(None, examples=["2022-04-01 12:00:00"]) + ended_at: str = Field(None, examples=["2022-04-01 12:00:00"]) model_config = { "use_enum_values": True diff --git a/user_code/src/program.py b/user_code/src/program.py index 43c56d9..7dc5e25 100644 --- a/user_code/src/program.py +++ b/user_code/src/program.py @@ -18,4 +18,4 @@ def run(**kwargs): logger.info("loguru - data: {}", data) logger.info("loguru - params: {}", params) - return {data: data, params: params} + return {"data": data, "params": params}