Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into openmp
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Aug 4, 2023
2 parents 0ef68b3 + fe3d055 commit c87cef6
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 27 deletions.
4 changes: 4 additions & 0 deletions pympipool/interfaces/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ def __init__(self):
self._future_queue = queue.Queue()
self._process = None

@property
def future_queue(self):
return self._future_queue

def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Expand Down
16 changes: 8 additions & 8 deletions pympipool/interfaces/fluxbroker.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import os
import queue
import socket
from socket import gethostname
import sys
from time import sleep

from pympipool.shared.broker import (
_get_future_done,
_execute_task_dict,
get_future_done,
execute_task_dict,
)
from pympipool.interfaces.base import ExecutorBase
from pympipool.shared.thread import RaisingThread
from pympipool.shared.taskexecutor import (
cloudpickle_register,
_execute_parallel_tasks_loop,
execute_parallel_tasks_loop,
)
from pympipool.shared.connections import FluxPythonInterface
from pympipool.shared.communication import SocketInterface
Expand Down Expand Up @@ -152,7 +152,7 @@ def execute_parallel_tasks(
gpus_per_core=gpus_per_task,
executor=executor,
)
_execute_parallel_tasks_loop(interface=interface, future_queue=future_queue)
execute_parallel_tasks_loop(interface=interface, future_queue=future_queue)


def interface_bootup(
Expand All @@ -165,7 +165,7 @@ def interface_bootup(
):
command_lst += [
"--host",
socket.gethostname(),
gethostname(),
]
connections = FluxPythonInterface(
cwd=cwd,
Expand Down Expand Up @@ -210,7 +210,7 @@ def executor_broker(
except queue.Empty:
sleep(sleep_interval)
else:
if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
future_queue.task_done()
else:
future_queue.task_done()
Expand All @@ -227,7 +227,7 @@ def _get_executor_list(
executor=None,
):
return {
_get_future_done(): SingleTaskExecutor(
get_future_done(): SingleTaskExecutor(
cores=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_task=int(gpus_per_worker / cores_per_worker),
Expand Down
4 changes: 2 additions & 2 deletions pympipool/legacy/shared/backend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from tqdm import tqdm

from pympipool.shared.backend import call_funct, _update_default_dict_from_arguments
from pympipool.shared.backend import call_funct, update_default_dict_from_arguments


def map_funct(executor, funct, lst, chunksize=1, cores_per_task=1, map_flag=True):
Expand Down Expand Up @@ -42,7 +42,7 @@ def parse_arguments(argument_lst):
Returns:
dict: dictionary with the parsed arguments and their corresponding values
"""
return _update_default_dict_from_arguments(
return update_default_dict_from_arguments(
argument_lst=argument_lst,
argument_dict={
"total_cores": "--cores-total",
Expand Down
4 changes: 2 additions & 2 deletions pympipool/shared/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def parse_arguments(argument_lst):
Returns:
dict: dictionary with the parsed arguments and their corresponding values
"""
return _update_default_dict_from_arguments(
return update_default_dict_from_arguments(
argument_lst=argument_lst,
argument_dict={
"zmqport": "--zmqport",
Expand All @@ -50,7 +50,7 @@ def parse_arguments(argument_lst):
)


def _update_default_dict_from_arguments(argument_lst, argument_dict, default_dict):
def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict):
default_dict.update(
{
k: argument_lst[argument_lst.index(v) + 1]
Expand Down
10 changes: 5 additions & 5 deletions pympipool/shared/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ def executor_broker(
except queue.Empty:
sleep(sleep_interval)
else:
if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
future_queue.task_done()
else:
future_queue.task_done()
break


def _execute_task_dict(task_dict, meta_future_lst):
def execute_task_dict(task_dict, meta_future_lst):
if "fn" in task_dict.keys():
meta_future = next(as_completed(meta_future_lst.keys()))
executor = meta_future_lst.pop(meta_future)
executor._future_queue.put(task_dict)
executor.future_queue.put(task_dict)
meta_future_lst[task_dict["future"]] = executor
return True
elif "shutdown" in task_dict.keys() and task_dict["shutdown"]:
Expand All @@ -72,7 +72,7 @@ def _get_executor_list(
queue_adapter_kwargs=None,
):
return {
_get_future_done(): Executor(
get_future_done(): Executor(
cores=cores_per_worker,
gpus_per_task=int(gpus_per_worker / cores_per_worker),
oversubscribe=oversubscribe,
Expand All @@ -87,7 +87,7 @@ def _get_executor_list(
}


def _get_future_done():
def get_future_done():
f = Future()
f.set_result(True)
return f
4 changes: 2 additions & 2 deletions pympipool/shared/communication.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import cloudpickle
import socket
from socket import gethostname
import zmq

from pympipool.shared.connections import get_connection_interface
Expand Down Expand Up @@ -111,7 +111,7 @@ def interface_bootup(
if enable_flux_backend or enable_slurm_backend or queue_adapter is not None:
command_lst += [
"--host",
socket.gethostname(),
gethostname(),
]
connections = get_connection_interface(
cwd=cwd,
Expand Down
4 changes: 2 additions & 2 deletions pympipool/shared/taskexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ def execute_parallel_tasks(
queue_adapter=queue_adapter,
queue_adapter_kwargs=queue_adapter_kwargs,
)
_execute_parallel_tasks_loop(interface=interface, future_queue=future_queue)
execute_parallel_tasks_loop(interface=interface, future_queue=future_queue)


def _execute_parallel_tasks_loop(interface, future_queue):
def execute_parallel_tasks_loop(interface, future_queue):
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
Expand Down
12 changes: 6 additions & 6 deletions tests/test_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import unittest
from pympipool.shared.broker import (
executor_broker,
_execute_task_dict,
_get_future_done,
execute_task_dict,
get_future_done,
_get_executor_list,
)

Expand All @@ -24,7 +24,7 @@ def mpi_funct(i):

class TestFutureCreation(unittest.TestCase):
def test_get_future_done(self):
f = _get_future_done()
f = get_future_done()
self.assertTrue(isinstance(f, Future))
self.assertTrue(f.done())

Expand All @@ -43,21 +43,21 @@ def test_meta_executor_future(self):
def test_execute_task_dict(self):
meta_future_lst = _get_executor_list(max_workers=1)
f = Future()
self.assertTrue(_execute_task_dict(
self.assertTrue(execute_task_dict(
task_dict={"fn": calc, "args": (1,), "kwargs": {}, "future": f},
meta_future_lst=meta_future_lst
))
self.assertEqual(f.result(), 1)
self.assertTrue(f.done())
self.assertFalse(_execute_task_dict(
self.assertFalse(execute_task_dict(
task_dict={"shutdown": True, "wait": True},
meta_future_lst=meta_future_lst
))

def test_execute_task_dict_error(self):
meta_future_lst = _get_executor_list(max_workers=1)
with self.assertRaises(ValueError):
_execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst)
execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst)
list(meta_future_lst.values())[0].shutdown(wait=True)

def test_executor_broker(self):
Expand Down

0 comments on commit c87cef6

Please sign in to comment.