Skip to content

Commit

Permalink
Merge pull request #8 from cedricleroy/develop
Browse files Browse the repository at this point in the history
v0.5.0
  • Loading branch information
cedricleroy authored Apr 3, 2018
2 parents f4f7e5c + dcc0798 commit 17812be
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ venv*/
*.pyc
*.vscode/
*.pytest_cache/
.DS_Store
27 changes: 27 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ in the ``graph`` instance.

The result will be (a + b) / 10 - a = -1.5

parallelism
-----------

When resolving the dag, pyungo figure out nodes that can be run
in parallel. When creating a graph, we can specify the option
`parallel=True` for running calculations concurently when possible,
using `Python multiprocessing module <https://docs.python.org/3.6/library/multiprocessing.html>`_.
We can specify the pool size when instantiating the Graph. This will
set the maximum number of processes that will be launched. If 3 nodes
can run in parallel and just 2 processes are used, pyungo will run
calculation on the first 2 nodes first and will run the last one as soon
as a process will be free.

Instantiating a `Graph` with a pool of 5 processes for running calculations
in parralel:

.. code-block:: python
graph = Graph(parallel=True, pool_size=5)
Note: Running functions in parallel has a cost. Python will spend time
creating / deleting new processes. Parallelism is recommended when at
least 2 concurrent nodes have heavy calculations which takes a significant
amount of time.


sanity check
------------

Expand Down
148 changes: 148 additions & 0 deletions examples/pvlib_ex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
""" Bigger example on solar PV modeling using PVLib
https://github.com/pvlib/pvlib-python
Example: https://github.com/pvlib/pvlib-python/blob/master/docs/tutorials/tmy_to_power.ipynb
"""

import inspect
import os
from pprint import pprint
import warnings
try:
import pvlib
except ImportError:
raise ImportError('This example require pvlib')
from pvlib import pvsystem
from pyungo.core import Graph


warnings.filterwarnings("ignore")


pvlib_abspath = os.path.dirname(os.path.abspath(inspect.getfile(pvlib)))
datapath = os.path.join(pvlib_abspath, 'data', '703165TY.csv')
tmy_data, meta = pvlib.tmy.readtmy3(datapath, coerce_year=2015)
tmy_data.index.name = 'Time'
tmy_data = tmy_data.shift(freq='-30Min')
sandia_modules = pvlib.pvsystem.retrieve_sam(name='SandiaMod')
cec_modules = pvlib.pvsystem.retrieve_sam(name='CECMod')
sapm_inverters = pvlib.pvsystem.retrieve_sam('sandiainverter')
sapm_inverter = sapm_inverters['ABB__MICRO_0_25_I_OUTD_US_208_208V__CEC_2014_']

data = {
'index': tmy_data.index,
'surface_tilt': 30,
'surface_azimuth': 180,
'DHI': tmy_data['DHI'],
'DNI': tmy_data['DNI'],
'GHI': tmy_data['GHI'],
'Wspd': tmy_data['Wspd'],
'DryBulb': tmy_data['DryBulb'],
'albedo': 0.2,
'latitude': meta['latitude'],
'longitude': meta['longitude'],
'sandia_module': sandia_modules.Canadian_Solar_CS5P_220M___2009_,
'cec_module': cec_modules.Canadian_Solar_CS5P_220M,
'alpha_sc': cec_modules.Canadian_Solar_CS5P_220M['alpha_sc'],
'EgRef': 1.121,
'dEgdT': -0.0002677,
'inverter': sapm_inverter
}


def unpack_df(f):
def wrap(*args, **kwargs):
res = f(*args, **kwargs)
return res.values.T
return wrap


def unpack_dict(f): # (ordered)
def wrap(*args, **kwargs):
res = f(*args, **kwargs)
return [res[k] for k in res]
return wrap


# parallelism not needed for this example
graph = Graph(parallel=False)

graph.add_node(
unpack_df(pvlib.solarposition.get_solarposition),
inputs=['index', 'latitude', 'longitude'],
outputs=['apparent_elevation', 'apparent_zenith', 'azimuth',
'elevation', 'equation_of_time', 'zenith']
)
graph.add_node(
pvlib.irradiance.extraradiation,
inputs=['index'],
outputs=['dni_extra']
)
graph.add_node(
pvlib.atmosphere.relativeairmass,
inputs=['apparent_zenith'],
outputs=['airmass']
)
graph.add_node(
pvlib.irradiance.haydavies,
inputs=['surface_tilt', 'surface_azimuth', 'DHI', 'DNI',
'dni_extra', 'apparent_zenith', 'azimuth'],
outputs=['poa_sky_diffuse']
)
graph.add_node(
pvlib.irradiance.grounddiffuse,
inputs=['surface_tilt', 'GHI', 'albedo'],
outputs=['poa_ground_diffuse']
)
graph.add_node(
pvlib.irradiance.aoi,
inputs=['surface_tilt', 'surface_azimuth', 'apparent_zenith', 'azimuth'],
outputs=['aoi']
)
graph.add_node(
unpack_df(pvlib.irradiance.globalinplane),
inputs=['aoi', 'DNI', 'poa_sky_diffuse', 'poa_ground_diffuse'],
outputs=['poa_global', 'poa_direct', 'poa_diffuse']
)
graph.add_node(
unpack_df(pvlib.pvsystem.sapm_celltemp),
inputs=['poa_global', 'Wspd', 'DryBulb'],
outputs=['temp_cell', 'temp_module']
)
graph.add_node(
pvlib.pvsystem.sapm_effective_irradiance,
inputs=['poa_direct', 'poa_diffuse', 'airmass', 'aoi', 'sandia_module'],
outputs=['effective_irradiance'],
)
graph.add_node(
unpack_dict(pvlib.pvsystem.sapm),
inputs=['effective_irradiance', 'temp_cell', 'sandia_module'],
outputs=['i_sc_sapm', 'i_mp_sapm', 'v_oc_sapm', 'v_mp_sapm', 'p_mp_sapm', 'i_x_sapm', 'i_xx_sapm']
)
graph.add_node(
pvlib.pvsystem.calcparams_desoto,
inputs=['poa_global', 'temp_cell', 'alpha_sc', 'cec_module', 'EgRef', 'dEgdT'],
outputs=['photocurrent', 'saturation_current', 'resistance_series',
'resistance_shunt', 'nNsVth']
)
graph.add_node(
unpack_dict(pvlib.pvsystem.singlediode),
inputs=['photocurrent', 'saturation_current', 'resistance_series', 'resistance_shunt', 'nNsVth'],
outputs=['i_sc_sd', 'i_mp_sd', 'v_oc_sd', 'v_mp_sd', 'p_mp_sd', 'i_x_sd', 'i_xx_sd']
)
graph.add_node(
pvlib.pvsystem.snlinverter,
inputs=['v_mp_sapm', 'p_mp_sapm', 'inverter'],
outputs=['pac_sapm']
)
graph.add_node(
pvlib.pvsystem.snlinverter,
inputs=['v_mp_sd', 'p_mp_sd', 'inverter'],
outputs=['pac_sd']
)

pprint(graph.dag)
graph.calculate(data)
print(sum(graph.data['pac_sd']))
print(sum(graph.data['pac_sapm']))
64 changes: 59 additions & 5 deletions pyungo/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from functools import reduce
from copy import deepcopy
import datetime as dt
from functools import reduce
import logging
from multiprocessing.dummy import Pool as ThreadPool


logging.basicConfig()
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


class PyungoError(Exception):
Expand Down Expand Up @@ -42,7 +50,11 @@ def __repr__(self):
)

def __call__(self, args, **kwargs):
return self._fct(*args, **kwargs)
t1 = dt.datetime.utcnow()
res = self._fct(*args, **kwargs)
t2 = dt.datetime.utcnow()
LOGGER.info('Ran {} in {}'.format(self, t2-t1))
return res

@property
def id(self):
Expand All @@ -67,11 +79,20 @@ def output_names(self):
def fct_name(self):
return self._fct.__name__

def load_inputs(self, data_to_pass, kwargs_to_pass):
self._data_to_pass = data_to_pass
self._kwargs_to_pass = kwargs_to_pass

def run_with_loaded_inputs(self):
return self(self._data_to_pass, **self._kwargs_to_pass)


class Graph:
def __init__(self):
def __init__(self, parallel=False, pool_size=2):
self._nodes = []
self._data = None
self._parallel = parallel
self._pool_size = pool_size

@property
def data(self):
Expand Down Expand Up @@ -100,11 +121,19 @@ def dag(self):
ordered_nodes.append(nodes)
return ordered_nodes

@staticmethod
def run_node(node):
return (node.id, node.run_with_loaded_inputs())

def _register(self, f, **kwargs):
input_names = kwargs.get('inputs')
if not input_names:
raise PyungoError('Missing inputs parameter')
output_names = kwargs.get('outputs')
if not output_names:
raise PyungoError('Missing outputs parameters')
args_names = kwargs.get('args')
kwargs_names = kwargs.get('kwargs')
output_names = kwargs.get('outputs')
self._create_node(
f, input_names, output_names, args_names, kwargs_names
)
Expand Down Expand Up @@ -160,11 +189,14 @@ def _check_inputs(self, data):
raise PyungoError(msg)

def calculate(self, data):
t1 = dt.datetime.utcnow()
LOGGER.info('Starting calculation...')
self._data = deepcopy(data)
self._check_inputs(data)
dep = self._dependencies()
sorted_dep = topological_sort(dep)
for items in sorted_dep:
# loading node with inputs
for item in items:
node = self._get_node(item)
args = [i_name for i_name in node.input_names if i_name not in node.kwargs]
Expand All @@ -174,10 +206,32 @@ def calculate(self, data):
kwargs_to_pass = {}
for kwarg in node.kwargs:
kwargs_to_pass[kwarg] = self._data[kwarg]
res = node(data_to_pass, **kwargs_to_pass)
node.load_inputs(data_to_pass, kwargs_to_pass)
# running nodes
if self._parallel:
pool = ThreadPool(self._pool_size)
results = pool.map(
Graph.run_node,
[self._get_node(i) for i in items]
)
pool.close()
pool.join()
results = {k: v for k, v in results}
else:
results = {}
for item in items:
node = self._get_node(item)
res = node.run_with_loaded_inputs()
results[node.id] = res
# save results
for item in items:
node = self._get_node(item)
res = results[node.id]
if len(node.output_names) == 1:
self._data[node.output_names[0]] = res
else:
for i, out in enumerate(node.output_names):
self._data[out] = res[i]
t2 = dt.datetime.utcnow()
LOGGER.info('Calculation finished in {}'.format(t2-t1))
return res
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

setup(
name='pyungo',
version='0.4.0',
version='0.5.0',
description='Function dependencies resolution and execution',
long_description=README,
url='https://github.com/cedricleroy/pyungo',
Expand Down
43 changes: 42 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,31 @@ def f_my_function2(c):
assert graph.data['e'] == -1.5


def test_simple_parralel():
""" TODO: We could mock and make sure things are called correctly """

graph = Graph(parallel=True)

def f_my_function(a, b):
return a + b

def f_my_function3(d, a):
return d - a

def f_my_function2(c):
return c / 10.

graph.add_node(f_my_function, inputs=['a', 'b'], outputs=['c'])
graph.add_node(f_my_function3, inputs=['d', 'a'], outputs=['e'])
graph.add_node(f_my_function2, inputs=['c'], outputs=['d'])
graph.add_node(f_my_function2, inputs=['c'], outputs=['f'])
graph.add_node(f_my_function2, inputs=['c'], outputs=['g'])

res = graph.calculate(data={'a': 2, 'b': 3})

assert res == -1.5


def test_multiple_outputs():
graph = Graph()

Expand Down Expand Up @@ -129,7 +154,7 @@ def f_my_function2(c):

with pytest.raises(PyungoError) as err:
graph.calculate(data={'a': 6, 'b': 4})

assert "A cyclic dependency exists amongst" in str(err.value)


Expand Down Expand Up @@ -199,3 +224,19 @@ def f_my_function2(c):
dag = graph.dag
for i, fct_name in enumerate(expected):
assert dag[i][0].fct_name == fct_name


def test_missing_inputs():

graph = Graph(parallel=True)

def f_my_function(a, b):
return a + b

with pytest.raises(PyungoError) as err:
graph.add_node(f_my_function, outputs=['c'])
assert "Missing inputs parameter" in str(err.value)

with pytest.raises(PyungoError) as err:
graph.add_node(f_my_function, inputs=['a', 'b'])
assert "Missing outputs parameter" in str(err.value)

0 comments on commit 17812be

Please sign in to comment.