diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 828ea352da..0c598ac3cc 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -431,8 +431,6 @@ def start(self): self._start_result_queue_thread() self._start_local_interchange_process() - logger.debug("Created result queue thread: %s", self._result_queue_thread) - self.initialize_scaling() @wrap_with_logs @@ -529,6 +527,8 @@ def _start_local_interchange_process(self) -> None: get the worker task and result ports that the interchange has bound to. """ + assert self.interchange_proc is None, f"Already exists! {self.interchange_proc!r}" + interchange_config = {"client_address": self.loopback_address, "client_ports": (self.outgoing_q.port, self.incoming_q.port, @@ -563,7 +563,12 @@ def _start_local_interchange_process(self) -> None: except CommandClientTimeoutError: logger.error("Interchange has not completed initialization. Aborting") raise Exception("Interchange failed to start") - logger.debug("Got worker ports") + logger.debug( + "Interchange process started (%r). Worker ports: %d, %d", + self.interchange_proc, + self.worker_task_port, + self.worker_result_port + ) def _start_result_queue_thread(self): """Method to start the result queue thread as a daemon. @@ -571,15 +576,13 @@ def _start_result_queue_thread(self): Checks if a thread already exists, then starts it. Could be used later as a restart if the result queue thread dies. """ - if self._result_queue_thread is None: - logger.debug("Starting result queue thread") - self._result_queue_thread = threading.Thread(target=self._result_queue_worker, name="HTEX-Result-Queue-Thread") - self._result_queue_thread.daemon = True - self._result_queue_thread.start() - logger.debug("Started result queue thread") + assert self._result_queue_thread is None, f"Already exists! {self._result_queue_thread!r}" - else: - logger.error("Result queue thread already exists, returning") + logger.debug("Starting result queue thread") + self._result_queue_thread = threading.Thread(target=self._result_queue_worker, name="HTEX-Result-Queue-Thread") + self._result_queue_thread.daemon = True + self._result_queue_thread.start() + logger.debug("Started result queue thread: %r", self._result_queue_thread) def hold_worker(self, worker_id: str) -> None: """Puts a worker on hold, preventing scheduling of additional tasks to it.