From d54c9454e0d06175eaee236de2549b22c867839a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 21 Jan 2025 17:41:20 +0000 Subject: [PATCH] Move htex interchange tasks incoming thread into main thread (#3752) This removes one of two non-main threads in the interchange - the task puller thread - and moves the behaviour there (receive a message and put it in an in-process queue) into the main thread (where that in-process queue is ultimately dequeued, anyway) This is aimed at helping with ZMQ-vs-threads issues within the interchange -- most immediately, clean shutdown #3697 performance notes: parsl-perf -t 30, my laptop, no logging before this PR, 2320 tasks/second post this PR, 2344 tasks/second cc @rjmello who expressed especial interest in this # Changed Behaviour Some performance difference, although the brief measurements above are not concerning. ## Type of change - New feature --------- Co-authored-by: Kevin Hunter Kesling --- .../executors/high_throughput/interchange.py | 46 ++++++++----------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 12d3e07f31..0dfbc1ba33 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -132,6 +132,11 @@ def __init__(self, self.hub_zmq_port = hub_zmq_port self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6) + + # count of tasks that have been received from the submit side + self.task_counter = 0 + + # count of tasks that have been sent out to worker pools self.count = 0 self.worker_ports = worker_ports @@ -201,28 +206,6 @@ def get_tasks(self, count: int) -> Sequence[dict]: return tasks - @wrap_with_logs(target="interchange") - def task_puller(self) -> NoReturn: - """Pull tasks from the incoming tasks zmq pipe onto the internal - pending task queue - """ - logger.info("Starting") - task_counter = 0 - - while True: - logger.debug("launching recv_pyobj") - try: - msg = self.task_incoming.recv_pyobj() - except zmq.Again: - # We just timed out while attempting to receive - logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize())) - continue - - logger.debug("putting message onto pending_task_queue") - self.pending_task_queue.put(msg) - task_counter += 1 - logger.debug(f"Fetched {task_counter} tasks so far") - def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None: if monitoring_radio: logger.info("Sending message {} to MonitoringHub".format(manager)) @@ -326,11 +309,6 @@ def start(self) -> None: start = time.time() - self._task_puller_thread = threading.Thread(target=self.task_puller, - name="Interchange-Task-Puller", - daemon=True) - self._task_puller_thread.start() - self._command_thread = threading.Thread(target=self._command_server, name="Interchange-Command", daemon=True) @@ -341,6 +319,7 @@ def start(self) -> None: poller = zmq.Poller() poller.register(self.task_outgoing, zmq.POLLIN) poller.register(self.results_incoming, zmq.POLLIN) + poller.register(self.task_incoming, zmq.POLLIN) # These are managers which we should examine in an iteration # for scheduling a job (or maybe any other attention?). @@ -351,6 +330,7 @@ def start(self) -> None: while not kill_event.is_set(): self.socks = dict(poller.poll(timeout=poll_period)) + self.process_task_incoming() self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event) self.process_results_incoming(interesting_managers, monitoring_radio) self.expire_bad_managers(interesting_managers, monitoring_radio) @@ -362,6 +342,18 @@ def start(self) -> None: logger.info(f"Processed {self.count} tasks in {delta} seconds") logger.warning("Exiting") + def process_task_incoming(self) -> None: + """Process incoming task message(s). + """ + + if self.task_incoming in self.socks and self.socks[self.task_incoming] == zmq.POLLIN: + logger.debug("start task_incoming section") + msg = self.task_incoming.recv_pyobj() + logger.debug("putting message onto pending_task_queue") + self.pending_task_queue.put(msg) + self.task_counter += 1 + logger.debug(f"Fetched {self.task_counter} tasks so far") + def process_task_outgoing_incoming( self, interesting_managers: Set[bytes],