From 2eac0fdd2fb2465f57ff6f6503e032039ff2e01e Mon Sep 17 00:00:00 2001 From: Cedric Leroy Date: Sat, 31 Mar 2018 10:10:10 -0500 Subject: [PATCH 1/7] mac files --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index d64196b..3170c5c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ venv*/ *.pyc *.vscode/ *.pytest_cache/ +.DS_Store From 895e480e146d68e6c117926fc4663fa6ed9e4deb Mon Sep 17 00:00:00 2001 From: Cedric Leroy Date: Sat, 31 Mar 2018 10:15:14 -0500 Subject: [PATCH 2/7] multiprocessing for running functions in parralel --- pyungo/core.py | 39 +++++++++++++++++++++++++++++++++++++-- tests/test_core.py | 25 +++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/pyungo/core.py b/pyungo/core.py index 98b836c..80c4db1 100644 --- a/pyungo/core.py +++ b/pyungo/core.py @@ -1,5 +1,6 @@ from functools import reduce from copy import deepcopy +from multiprocessing.dummy import Pool as ThreadPool class PyungoError(Exception): @@ -67,11 +68,20 @@ def output_names(self): def fct_name(self): return self._fct.__name__ + def load_inputs(self, data_to_pass, kwargs_to_pass): + self._data_to_pass = data_to_pass + self._kwargs_to_pass = kwargs_to_pass + + def run_with_loaded_inputs(self): + return self(self._data_to_pass, **self._kwargs_to_pass) + class Graph: - def __init__(self): + def __init__(self, parallel=False, pool_size=2): self._nodes = [] self._data = None + self._parallel = parallel + self._pool_size = pool_size @property def data(self): @@ -100,6 +110,10 @@ def dag(self): ordered_nodes.append(nodes) return ordered_nodes + @staticmethod + def run_node(node): + return (node.id, node.run_with_loaded_inputs()) + def _register(self, f, **kwargs): input_names = kwargs.get('inputs') args_names = kwargs.get('args') @@ -165,6 +179,7 @@ def calculate(self, data): dep = self._dependencies() sorted_dep = topological_sort(dep) for items in sorted_dep: + # loading node with inputs for item in items: node = self._get_node(item) args = [i_name for i_name in node.input_names if i_name not in node.kwargs] @@ -174,7 +189,27 @@ def calculate(self, data): kwargs_to_pass = {} for kwarg in node.kwargs: kwargs_to_pass[kwarg] = self._data[kwarg] - res = node(data_to_pass, **kwargs_to_pass) + node.load_inputs(data_to_pass, kwargs_to_pass) + # running nodes + if self._parallel: + pool = ThreadPool(self._pool_size) + results = pool.map( + Graph.run_node, + [self._get_node(i) for i in items] + ) + pool.close() + pool.join() + results = {k: v for k, v in results} + else: + results = {} + for item in items: + node = self._get_node(item) + res = node.run_with_loaded_inputs() + results[node.id] = res + # save results + for item in items: + node = self._get_node(item) + res = results[node.id] if len(node.output_names) == 1: self._data[node.output_names[0]] = res else: diff --git a/tests/test_core.py b/tests/test_core.py index 44e735f..56e93b1 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -45,6 +45,31 @@ def f_my_function2(c): assert graph.data['e'] == -1.5 +def test_simple_parralel(): + """ TODO: We could mock and make sure things are called correctly """ + + graph = Graph(parallel=True) + + def f_my_function(a, b): + return a + b + + def f_my_function3(d, a): + return d - a + + def f_my_function2(c): + return c / 10. + + graph.add_node(f_my_function, inputs=['a', 'b'], outputs=['c']) + graph.add_node(f_my_function3, inputs=['d', 'a'], outputs=['e']) + graph.add_node(f_my_function2, inputs=['c'], outputs=['d']) + graph.add_node(f_my_function2, inputs=['c'], outputs=['f']) + graph.add_node(f_my_function2, inputs=['c'], outputs=['g']) + + res = graph.calculate(data={'a': 2, 'b': 3}) + + assert res == -1.5 + + def test_multiple_outputs(): graph = Graph() From fbf0d617c0e9855847d4ef012cefc345240887aa Mon Sep 17 00:00:00 2001 From: Cedric Leroy Date: Sat, 31 Mar 2018 10:24:34 -0500 Subject: [PATCH 3/7] add check on add_node mandatory parameter --- pyungo/core.py | 6 +++++- tests/test_core.py | 18 +++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/pyungo/core.py b/pyungo/core.py index 80c4db1..8329935 100644 --- a/pyungo/core.py +++ b/pyungo/core.py @@ -116,9 +116,13 @@ def run_node(node): def _register(self, f, **kwargs): input_names = kwargs.get('inputs') + if not input_names: + raise PyungoError('Missing inputs parameter') + output_names = kwargs.get('outputs') + if not output_names: + raise PyungoError('Missing outputs parameters') args_names = kwargs.get('args') kwargs_names = kwargs.get('kwargs') - output_names = kwargs.get('outputs') self._create_node( f, input_names, output_names, args_names, kwargs_names ) diff --git a/tests/test_core.py b/tests/test_core.py index 56e93b1..75260bf 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -154,7 +154,7 @@ def f_my_function2(c): with pytest.raises(PyungoError) as err: graph.calculate(data={'a': 6, 'b': 4}) - + assert "A cyclic dependency exists amongst" in str(err.value) @@ -224,3 +224,19 @@ def f_my_function2(c): dag = graph.dag for i, fct_name in enumerate(expected): assert dag[i][0].fct_name == fct_name + + +def test_missing_inputs(): + + graph = Graph(parallel=True) + + def f_my_function(a, b): + return a + b + + with pytest.raises(PyungoError) as err: + graph.add_node(f_my_function, outputs=['c']) + assert "Missing inputs parameter" in str(err.value) + + with pytest.raises(PyungoError) as err: + graph.add_node(f_my_function, inputs=['a', 'b']) + assert "Missing outputs parameter" in str(err.value) From 0a5863e8ab4090921f6b059f977d2baae5ec9914 Mon Sep 17 00:00:00 2001 From: Cedric Leroy Date: Sat, 31 Mar 2018 10:46:38 -0500 Subject: [PATCH 4/7] add some logging --- pyungo/core.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pyungo/core.py b/pyungo/core.py index 8329935..195941d 100644 --- a/pyungo/core.py +++ b/pyungo/core.py @@ -1,8 +1,15 @@ -from functools import reduce from copy import deepcopy +import datetime as dt +from functools import reduce +import logging from multiprocessing.dummy import Pool as ThreadPool +logging.basicConfig() +LOGGER = logging.getLogger() +LOGGER.setLevel(logging.INFO) + + class PyungoError(Exception): pass @@ -43,7 +50,11 @@ def __repr__(self): ) def __call__(self, args, **kwargs): - return self._fct(*args, **kwargs) + t1 = dt.datetime.utcnow() + res = self._fct(*args, **kwargs) + t2 = dt.datetime.utcnow() + LOGGER.info('Ran {} in {}'.format(self, t2-t1)) + return res @property def id(self): @@ -178,6 +189,8 @@ def _check_inputs(self, data): raise PyungoError(msg) def calculate(self, data): + t1 = dt.datetime.utcnow() + LOGGER.info('Starting calculation...') self._data = deepcopy(data) self._check_inputs(data) dep = self._dependencies() @@ -219,4 +232,6 @@ def calculate(self, data): else: for i, out in enumerate(node.output_names): self._data[out] = res[i] + t2 = dt.datetime.utcnow() + LOGGER.info('Calculation finished in {}'.format(t2-t1)) return res From d06a6a76f1790e191ed5690fa69519d56e63661a Mon Sep 17 00:00:00 2001 From: Cedric Leroy Date: Tue, 3 Apr 2018 07:54:13 -0500 Subject: [PATCH 5/7] example with pvlib --- examples/pvlib_ex.py | 148 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 examples/pvlib_ex.py diff --git a/examples/pvlib_ex.py b/examples/pvlib_ex.py new file mode 100644 index 0000000..dfdabfc --- /dev/null +++ b/examples/pvlib_ex.py @@ -0,0 +1,148 @@ +""" Bigger example on solar PV modeling using PVLib + +https://github.com/pvlib/pvlib-python + +Example: https://github.com/pvlib/pvlib-python/blob/master/docs/tutorials/tmy_to_power.ipynb +""" + +import inspect +import os +from pprint import pprint +import warnings +try: + import pvlib +except ImportError: + raise ImportError('This example require pvlib') +from pvlib import pvsystem +from pyungo.core import Graph + + +warnings.filterwarnings("ignore") + + +pvlib_abspath = os.path.dirname(os.path.abspath(inspect.getfile(pvlib))) +datapath = os.path.join(pvlib_abspath, 'data', '703165TY.csv') +tmy_data, meta = pvlib.tmy.readtmy3(datapath, coerce_year=2015) +tmy_data.index.name = 'Time' +tmy_data = tmy_data.shift(freq='-30Min') +sandia_modules = pvlib.pvsystem.retrieve_sam(name='SandiaMod') +cec_modules = pvlib.pvsystem.retrieve_sam(name='CECMod') +sapm_inverters = pvlib.pvsystem.retrieve_sam('sandiainverter') +sapm_inverter = sapm_inverters['ABB__MICRO_0_25_I_OUTD_US_208_208V__CEC_2014_'] + +data = { + 'index': tmy_data.index, + 'surface_tilt': 30, + 'surface_azimuth': 180, + 'DHI': tmy_data['DHI'], + 'DNI': tmy_data['DNI'], + 'GHI': tmy_data['GHI'], + 'Wspd': tmy_data['Wspd'], + 'DryBulb': tmy_data['DryBulb'], + 'albedo': 0.2, + 'latitude': meta['latitude'], + 'longitude': meta['longitude'], + 'sandia_module': sandia_modules.Canadian_Solar_CS5P_220M___2009_, + 'cec_module': cec_modules.Canadian_Solar_CS5P_220M, + 'alpha_sc': cec_modules.Canadian_Solar_CS5P_220M['alpha_sc'], + 'EgRef': 1.121, + 'dEgdT': -0.0002677, + 'inverter': sapm_inverter +} + + +def unpack_df(f): + def wrap(*args, **kwargs): + res = f(*args, **kwargs) + return res.values.T + return wrap + + +def unpack_dict(f): # (ordered) + def wrap(*args, **kwargs): + res = f(*args, **kwargs) + return [res[k] for k in res] + return wrap + + +# parallelism not needed for this example +graph = Graph(parallel=False) + +graph.add_node( + unpack_df(pvlib.solarposition.get_solarposition), + inputs=['index', 'latitude', 'longitude'], + outputs=['apparent_elevation', 'apparent_zenith', 'azimuth', + 'elevation', 'equation_of_time', 'zenith'] +) +graph.add_node( + pvlib.irradiance.extraradiation, + inputs=['index'], + outputs=['dni_extra'] +) +graph.add_node( + pvlib.atmosphere.relativeairmass, + inputs=['apparent_zenith'], + outputs=['airmass'] +) +graph.add_node( + pvlib.irradiance.haydavies, + inputs=['surface_tilt', 'surface_azimuth', 'DHI', 'DNI', + 'dni_extra', 'apparent_zenith', 'azimuth'], + outputs=['poa_sky_diffuse'] +) +graph.add_node( + pvlib.irradiance.grounddiffuse, + inputs=['surface_tilt', 'GHI', 'albedo'], + outputs=['poa_ground_diffuse'] +) +graph.add_node( + pvlib.irradiance.aoi, + inputs=['surface_tilt', 'surface_azimuth', 'apparent_zenith', 'azimuth'], + outputs=['aoi'] +) +graph.add_node( + unpack_df(pvlib.irradiance.globalinplane), + inputs=['aoi', 'DNI', 'poa_sky_diffuse', 'poa_ground_diffuse'], + outputs=['poa_global', 'poa_direct', 'poa_diffuse'] +) +graph.add_node( + unpack_df(pvlib.pvsystem.sapm_celltemp), + inputs=['poa_global', 'Wspd', 'DryBulb'], + outputs=['temp_cell', 'temp_module'] +) +graph.add_node( + pvlib.pvsystem.sapm_effective_irradiance, + inputs=['poa_direct', 'poa_diffuse', 'airmass', 'aoi', 'sandia_module'], + outputs=['effective_irradiance'], +) +graph.add_node( + unpack_dict(pvlib.pvsystem.sapm), + inputs=['effective_irradiance', 'temp_cell', 'sandia_module'], + outputs=['i_sc_sapm', 'i_mp_sapm', 'v_oc_sapm', 'v_mp_sapm', 'p_mp_sapm', 'i_x_sapm', 'i_xx_sapm'] +) +graph.add_node( + pvlib.pvsystem.calcparams_desoto, + inputs=['poa_global', 'temp_cell', 'alpha_sc', 'cec_module', 'EgRef', 'dEgdT'], + outputs=['photocurrent', 'saturation_current', 'resistance_series', + 'resistance_shunt', 'nNsVth'] +) +graph.add_node( + unpack_dict(pvlib.pvsystem.singlediode), + inputs=['photocurrent', 'saturation_current', 'resistance_series', 'resistance_shunt', 'nNsVth'], + outputs=['i_sc_sd', 'i_mp_sd', 'v_oc_sd', 'v_mp_sd', 'p_mp_sd', 'i_x_sd', 'i_xx_sd'] +) +graph.add_node( + pvlib.pvsystem.snlinverter, + inputs=['v_mp_sapm', 'p_mp_sapm', 'inverter'], + outputs=['pac_sapm'] +) +graph.add_node( + pvlib.pvsystem.snlinverter, + inputs=['v_mp_sd', 'p_mp_sd', 'inverter'], + outputs=['pac_sd'] +) + +pprint(graph.dag) +graph.calculate(data) +print(sum(graph.data['pac_sd'])) +print(sum(graph.data['pac_sapm'])) From fb9e5633da6efc07d62ba6e6d9c16650a41a5871 Mon Sep 17 00:00:00 2001 From: Cedric Leroy Date: Tue, 3 Apr 2018 07:57:35 -0500 Subject: [PATCH 6/7] v0.5.0 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f1515e8..49f816f 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ setup( name='pyungo', - version='0.4.0', + version='0.5.0', description='Function dependencies resolution and execution', long_description=README, url='https://github.com/cedricleroy/pyungo', From dcc07988b94379309e5599ddede62afeffe5a37b Mon Sep 17 00:00:00 2001 From: Cedric Leroy Date: Tue, 3 Apr 2018 08:13:09 -0500 Subject: [PATCH 7/7] add documentation on parallelism --- README.rst | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/README.rst b/README.rst index 87198f8..f7db69c 100644 --- a/README.rst +++ b/README.rst @@ -56,6 +56,33 @@ in the ``graph`` instance. The result will be (a + b) / 10 - a = -1.5 +parallelism +----------- + +When resolving the dag, pyungo figure out nodes that can be run +in parallel. When creating a graph, we can specify the option +`parallel=True` for running calculations concurently when possible, +using `Python multiprocessing module `_. +We can specify the pool size when instantiating the Graph. This will +set the maximum number of processes that will be launched. If 3 nodes +can run in parallel and just 2 processes are used, pyungo will run +calculation on the first 2 nodes first and will run the last one as soon +as a process will be free. + +Instantiating a `Graph` with a pool of 5 processes for running calculations +in parralel: + +.. code-block:: python + + graph = Graph(parallel=True, pool_size=5) + + +Note: Running functions in parallel has a cost. Python will spend time +creating / deleting new processes. Parallelism is recommended when at +least 2 concurrent nodes have heavy calculations which takes a significant +amount of time. + + sanity check ------------