Skip to content

Commit

Permalink
clean up finished workers
Browse files Browse the repository at this point in the history
  • Loading branch information
saleh-mir committed Sep 14, 2024
1 parent c3c9442 commit c630515
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions jesse/services/multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import threading
import time
from typing import List
import multiprocessing as mp
import traceback
Expand Down Expand Up @@ -42,6 +44,8 @@ def __init__(self):
self._pid_to_client_id_map = {}
self.client_id_to_pid_to_map = {}
self._active_workers_key = f"{ENV_VALUES['APP_PORT']}|active-processes"
self._cleanup_thread = threading.Thread(target=self._cleanup_finished_workers, daemon=True)
self._cleanup_thread.start()

def _reset(self):
self._workers = []
Expand Down Expand Up @@ -76,7 +80,6 @@ def get_client_id(self, pid):
client_id: str = self._pid_to_client_id_map[self._prefixed_pid(pid)]
except KeyError:
return None
# return after "|" because we add them before sending it to multiprocessing
return jh.string_after_character(client_id, '|')

def get_pid(self, client_id):
Expand All @@ -90,7 +93,16 @@ def flush(self):
w.terminate()
w.join()
w.close()
process_manager._reset()
self._reset()

def _cleanup_finished_workers(self):
while True:
for w in self._workers:
if not w.is_alive():
w.join()
w.close()
self._workers.remove(w)
time.sleep(5)

@property
def active_workers(self) -> set:
Expand Down

0 comments on commit c630515

Please sign in to comment.