Skip to content

Commit

Permalink
Programmer error protection: there can be only one (#3756)
Browse files Browse the repository at this point in the history
Just assert what's already happening (and _should_ be happening). Use it
to remove an indent level.

# Changed Behaviour

Should be no changed behavior for users.

## Type of change

- Code maintenance/cleanup
  • Loading branch information
khk-globus authored Jan 26, 2025
1 parent 572d411 commit 47e60f0
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,6 @@ def start(self):
self._start_result_queue_thread()
self._start_local_interchange_process()

logger.debug("Created result queue thread: %s", self._result_queue_thread)

self.initialize_scaling()

@wrap_with_logs
Expand Down Expand Up @@ -529,6 +527,8 @@ def _start_local_interchange_process(self) -> None:
get the worker task and result ports that the interchange has bound to.
"""

assert self.interchange_proc is None, f"Already exists! {self.interchange_proc!r}"

interchange_config = {"client_address": self.loopback_address,
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
Expand Down Expand Up @@ -563,23 +563,26 @@ def _start_local_interchange_process(self) -> None:
except CommandClientTimeoutError:
logger.error("Interchange has not completed initialization. Aborting")
raise Exception("Interchange failed to start")
logger.debug("Got worker ports")
logger.debug(
"Interchange process started (%r). Worker ports: %d, %d",
self.interchange_proc,
self.worker_task_port,
self.worker_result_port
)

def _start_result_queue_thread(self):
"""Method to start the result queue thread as a daemon.
Checks if a thread already exists, then starts it.
Could be used later as a restart if the result queue thread dies.
"""
if self._result_queue_thread is None:
logger.debug("Starting result queue thread")
self._result_queue_thread = threading.Thread(target=self._result_queue_worker, name="HTEX-Result-Queue-Thread")
self._result_queue_thread.daemon = True
self._result_queue_thread.start()
logger.debug("Started result queue thread")
assert self._result_queue_thread is None, f"Already exists! {self._result_queue_thread!r}"

else:
logger.error("Result queue thread already exists, returning")
logger.debug("Starting result queue thread")
self._result_queue_thread = threading.Thread(target=self._result_queue_worker, name="HTEX-Result-Queue-Thread")
self._result_queue_thread.daemon = True
self._result_queue_thread.start()
logger.debug("Started result queue thread: %r", self._result_queue_thread)

def hold_worker(self, worker_id: str) -> None:
"""Puts a worker on hold, preventing scheduling of additional tasks to it.
Expand Down

0 comments on commit 47e60f0

Please sign in to comment.