Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Subprocess queueing and pooling management
Browse files Browse the repository at this point in the history
Replace the use of multiprocessing pool with in-house logic to manage
queueing and launching of external commands, and of running
subprocesses.

Avoid creating unused child processes. Avoid having to start up pool
early.

Adjustable main loop sleep time.
matthewrmshin committed Feb 28, 2018
1 parent e54f578 commit 7fa49bd
Showing 9 changed files with 234 additions and 245 deletions.
2 changes: 1 addition & 1 deletion bin/cylc-check-versions
Original file line number Diff line number Diff line change
@@ -84,7 +84,7 @@ def main():
account_set.remove((user, host_str))
accounts.append((user, res))
if account_set:
task_remote_mgr.proc_pool.handle_results_async()
task_remote_mgr.proc_pool.process()
sleep(1.0)

# Interrogate the each remote account with CYLC_VERSION set to our version.
10 changes: 4 additions & 6 deletions bin/cylc-submit
Original file line number Diff line number Diff line change
@@ -125,7 +125,7 @@ def main():
for itask in prep_tasks + bad_tasks:
waiting_tasks.remove(itask)
if waiting_tasks:
task_job_mgr.proc_pool.handle_results_async()
task_job_mgr.proc_pool.process()
sleep(1.0)

for itask in itasks:
@@ -140,12 +140,10 @@ def main():
for itask in task_job_mgr.submit_task_jobs(suite, waiting_tasks):
waiting_tasks.remove(itask)
if waiting_tasks:
task_job_mgr.proc_pool.handle_results_async()
task_job_mgr.proc_pool.process()
sleep(1.0)
while task_job_mgr.proc_pool.results:
task_job_mgr.proc_pool.handle_results_async()
task_job_mgr.proc_pool.close()
task_job_mgr.proc_pool.join()
while task_job_mgr.proc_pool.is_not_done():
task_job_mgr.proc_pool.process()
for itask in itasks:
if itask.summary.get('submit_method_id') is not None:
print('[%s] Job ID: %s' % (
Loading

0 comments on commit 7fa49bd

Please sign in to comment.