Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Commit

Permalink
fix: refactor job executor
Browse files Browse the repository at this point in the history
  • Loading branch information
miwurster committed Apr 11, 2024
1 parent 123f001 commit 3658044
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 137 deletions.
30 changes: 0 additions & 30 deletions src/TaskExecutor.py

This file was deleted.

29 changes: 18 additions & 11 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import logging
import os
import sys
import time
import uuid
from typing import List

from fastapi import FastAPI, Path, Query
from fastapi import FastAPI, Path, Query, BackgroundTasks
from loguru import logger

from src.execution_manager import create_execution, delete_execution, get_execution_result, get_execution_status
from src.helpers.date_formatter import format_timestamp
from src.helpers.logging import LogHandler
from src.job_executor import JobExecutor
from src.model.execution_input import ExecutionInput
from src.model.execution_output import ExecutionOutput
from src.model.execution_status import ExecutionStatus
from src.model.health_check import HealthCheck
from src.model.job import Job

Expand All @@ -23,6 +27,8 @@
logging.getLogger().setLevel(logging_level)
logger.configure(handlers=[{"sink": sys.stdout, "level": logging_level}])

job_executor = JobExecutor()


@app.get('/',
tags=["Status API"],
Expand All @@ -35,22 +41,24 @@ def health_check() -> HealthCheck:
tags=["Service API"],
summary="Asynchronous execution of the service",
status_code=201)
async def create(execution_input: ExecutionInput) -> Job:
return create_execution(execution_input)
async def create_job(execution_input: ExecutionInput, background_tasks: BackgroundTasks) -> Job:
job_id = str(uuid.uuid4())
background_tasks.add_task(job_executor.create_job, job_id, execution_input)
return Job(id=job_id, status=ExecutionStatus.PENDING, created_at=format_timestamp(time.time()))


@app.get('/{id}',
tags=["Service API"],
summary="Check execution status")
def get_status(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> Job:
return get_execution_status(job_id)
def get_job_status(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> Job:
return job_executor.get_job_status(job_id)


@app.get('/{id}/result',
tags=["Service API"],
summary="Get the result of an execution")
def get_result(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> ExecutionOutput:
return get_execution_result(job_id)
def get_job_result(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> ExecutionOutput:
return job_executor.get_job_result(job_id)


@app.get('/{id}/interim-results',
Expand All @@ -68,6 +76,5 @@ def get_interim_results(
@app.put('/{id}/cancel',
tags=["Service API"],
summary="Cancel an execution")
def delete(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> Job:
delete_execution(job_id)
return get_execution_status(job_id)
def cancel_job(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> Job:
return job_executor.cancel_job(job_id)
2 changes: 2 additions & 0 deletions src/execute_user_code.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import traceback
from typing import Optional

from loguru import logger

from src.user_code_runner import run_user_code


Expand Down
60 changes: 0 additions & 60 deletions src/execution_manager.py

This file was deleted.

6 changes: 4 additions & 2 deletions src/helpers/date_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from fastapi import HTTPException


def format_timestamp(timestamp_float):
def format_timestamp(timestamp: float | None):
if timestamp is None:
return None
try:
timestamp_datetime = datetime.fromtimestamp(timestamp_float)
timestamp_datetime = datetime.fromtimestamp(timestamp)
return timestamp_datetime.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error formatting timestamp: {str(e)}")
48 changes: 48 additions & 0 deletions src/job_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os
from concurrent.futures import ThreadPoolExecutor

from fastapi import HTTPException

from src.execute_user_code import execute_user_code
from src.job_state import JobState
from src.model.execution_input import ExecutionInput
from src.model.job import Job


class JobExecutor:
def __init__(self):
self.jobs = {}
self.executor = ThreadPoolExecutor(max_workers=3)

def create_job(self, job_id: str, execution_input: ExecutionInput) -> None:
entry_point = os.environ.get("ENTRY_POINT", "user_code.src.program:run")
future = self.executor.submit(execute_user_code, execution_input.data, execution_input.params, entry_point)

job = JobState(job_id, future)
self.jobs[job_id] = job

def get_job_status(self, job_id: str) -> Job:
job = self.jobs.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Not found")

return job.get_status()

def get_job_result(self, job_id: str):
job = self.jobs.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Not found")

result = job.get_result()
if result is None:
raise HTTPException(status_code=404, detail="Not found")

return result

def cancel_job(self, job_id: str):
job = self.jobs.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Not found")

job.cancel()
return job.get_status()
30 changes: 0 additions & 30 deletions src/job_manager.py

This file was deleted.

49 changes: 49 additions & 0 deletions src/job_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import time
from concurrent.futures import Future

from src.helpers.date_formatter import format_timestamp
from src.model.execution_output import ExecutionOutput
from src.model.execution_status import ExecutionStatus
from src.model.job import Job


class JobState:
def __init__(self, job_id: str, future: Future):
self.job_id = job_id
self.created_at = time.time()
self.started_at = time.time()
self.ended_at = None
self.future = future
self.future.add_done_callback(lambda f: self.__set_ended_at())

def has_finished(self):
return self.future.done()

def get_status(self) -> Job:
if self.future.done():
status = ExecutionStatus.SUCCEEDED
elif self.future.cancelled():
status = ExecutionStatus.CANCELLED
elif self.future.exception() is not None:
status = ExecutionStatus.FAILED
else:
status = ExecutionStatus.RUNNING

return Job(
id=self.job_id,
status=status,
created_at=format_timestamp(self.created_at),
started_at=format_timestamp(self.started_at),
ended_at=format_timestamp(self.ended_at),
)

def get_result(self) -> ExecutionOutput | None:
if self.has_finished():
return self.future.result()
return None

def cancel(self):
self.future.cancel()

def __set_ended_at(self):
self.ended_at = time.time()
6 changes: 3 additions & 3 deletions src/model/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
class Job(BaseModel):
id: str = Field(examples=["87cb778e-ac43-11ec-b909-0242ac120002"])
status: ExecutionStatus = Field(examples=["SUCCEEDED"])
createdAt: str = Field(examples=["2022-04-01 12:00:00"])
startedAt: str = Field(None, examples=["2022-04-01 12:00:00"])
endedAt: str = Field(None, examples=["2022-04-01 12:00:00"])
created_at: str = Field(examples=["2022-04-01 12:00:00"])
started_at: str = Field(None, examples=["2022-04-01 12:00:00"])
ended_at: str = Field(None, examples=["2022-04-01 12:00:00"])

model_config = {
"use_enum_values": True
Expand Down
2 changes: 1 addition & 1 deletion user_code/src/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ def run(**kwargs):
logger.info("loguru - data: {}", data)
logger.info("loguru - params: {}", params)

return {data: data, params: params}
return {"data": data, "params": params}

0 comments on commit 3658044

Please sign in to comment.