diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 648f51edc5..eeb2955269 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -10,6 +10,7 @@ """ import logging import multiprocessing +import multiprocessing.synchronize import selectors import socket import threading @@ -33,20 +34,20 @@ class Acceptor(multiprocessing.Process): starts a new work thread. """ - lock = multiprocessing.Lock() - def __init__( self, idd: int, work_queue: connection.Connection, flags: Flags, work_klass: Type[ThreadlessWork], + lock: multiprocessing.synchronize.Lock, event_queue: Optional[EventQueue] = None) -> None: super().__init__() self.idd = idd self.work_queue: connection.Connection = work_queue self.flags = flags self.work_klass = work_klass + self.lock = lock self.event_queue = event_queue self.running = multiprocessing.Event() @@ -101,12 +102,13 @@ def start_work(self, conn: socket.socket, addr: Tuple[str, int]) -> None: work_thread.start() def run_once(self) -> None: - assert self.selector and self.sock with self.lock: + assert self.selector and self.sock events = self.selector.select(timeout=1) if len(events) == 0: return conn, addr = self.sock.accept() + # now = time.time() # fileno: int = conn.fileno() self.start_work(conn, addr) diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index 4b83b9ae95..87d7158f26 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -24,6 +24,8 @@ logger = logging.getLogger(__name__) +LOCK = multiprocessing.Lock() + class AcceptorPool: """AcceptorPool. @@ -66,7 +68,8 @@ def start_workers(self) -> None: work_queue=work_queue[1], flags=self.flags, work_klass=self.work_klass, - event_queue=self.event_queue + lock=LOCK, + event_queue=self.event_queue, ) acceptor.start() logger.debug( diff --git a/tests/core/test_acceptor.py b/tests/core/test_acceptor.py index 537339d537..2b4dbd9bc3 100644 --- a/tests/core/test_acceptor.py +++ b/tests/core/test_acceptor.py @@ -29,6 +29,7 @@ def setUp(self) -> None: idd=self.acceptor_id, work_queue=self.pipe[1], flags=self.flags, + lock=multiprocessing.Lock(), work_klass=self.mock_protocol_handler) @mock.patch('selectors.DefaultSelector')