Skip to content

Commit

Permalink
first push
Browse files Browse the repository at this point in the history
  • Loading branch information
szadkrud committed Jun 28, 2022
1 parent 783c429 commit 722135c
Show file tree
Hide file tree
Showing 67 changed files with 7,920 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# pycache bussiness
*__pycache__/*
*.pyc

# result binaries
results/*
Empty file added dynsys_framework/__init__.py
Empty file.
Empty file.
53 changes: 53 additions & 0 deletions dynsys_framework/dynamic_system/expression_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from dynsys_framework.dynamic_system.process import Process


class ExpressionTree:
def __init__(self, processes):
"""
:param processes:
"""
assert all([isinstance(p, Process) for p in processes]), "all processes should be of class Process"

self._output_names = []
self._all_input_names = []
for pi in processes:
self._output_names += pi.output_names
self._all_input_names += pi.input_names
assert len(set(self._output_names)) == len(self._output_names), \
"there are duplicates in outputs {}".format(self._output_names)

self._input_names = [inp for inp in self._all_input_names if not inp in self._output_names]

self._processes = Process.dependence_ordering(processes)

for i in range(len(self._processes)):
for j in range(i + 1, len(self._processes)):
assert not self._processes[i].is_depending_on(self._processes[j]), \
"Possible cyclic dependency in expression processes {} -> {}\n process order [{}]".format(
self._processes[i], self._processes[j], "\n".join([str(_p) for _p in self._processes]))

def set_functions(self, functions):
for process in self._processes:
process.set_function(functions)

def get_input_names(self):
"""
Input variable names required by this expression tree.
:return:
"""
return self._input_names

def get_output_names(self):
"""
All output variable names provided by this expression tree.
:return:
"""
return self._output_names

def __call__(self, state, *args, **kwargs):
for pi in self._processes:
values = pi(state)
names = pi.output_names
for nv in zip(names, values):
state[nv[0]] = nv[1]
return state
157 changes: 157 additions & 0 deletions dynsys_framework/dynamic_system/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from dynsys_framework.dynamic_system.process import Process
from dynsys_framework.dynamic_system.expression_tree import ExpressionTree


class Model:
def __init__(self, functions, processes: [Process]):
"""
:param functions: dictionary string -> fun(vars)->dvars
:param process_declarations:
"""
for p in processes:
assert isinstance(p, Process), "all processes should be of class Process. {} is not Process".format(p)
assert isinstance(functions, dict), "functions is a dictionary fun_name->callable"
# assert all([callable(f) for f in functions.values()]), "all functions values must be callable"
for f in functions:
assert callable(functions[f]), "function {} is not callable".format(f)

# assert all(p.function_name in functions.keys() for p in processes), "required function not found in functions"
for p in processes:
assert p.function_name in functions.keys(), "function {} is not in {}".format(
p.function_name, functions.keys())

self._processes = processes
self._differential_processes = []
self._expression_processes = []
for pi in processes:
if pi.is_differential():
self._differential_processes.append(pi)
else:
self._expression_processes.append(pi)

self._expression_tree = ExpressionTree(self._expression_processes)

self._diff_output_variable_names = [name
for process in self._differential_processes
for name in process.output_names]

self._intg_output_variable_names = [Process.variable_name_integrated(name)
for name in self._diff_output_variable_names]

self._diff_to_intg = dict(zip(self._diff_output_variable_names, self._intg_output_variable_names))

self._expr_output_variable_names = [name
for process in self._expression_processes
for name in process.output_names]

self._clean_output_names = self._diff_output_variable_names + self._expr_output_variable_names

self._output_names = self._clean_output_names + self._intg_output_variable_names

assert len(self._output_names) == len(set(self._output_names)), \
"there is a duplicate in output_names: {}".format(self._output_names)
self._output_names = set(self._output_names)

self._diff_input_variable_names = [name
for process in self._differential_processes
for name in process.input_names]

self._expr_input_variable_names = [name
for process in self._expression_processes
for name in process.input_names]

self._input_names = set(self._diff_input_variable_names + self._expr_input_variable_names)

self._required_state_variables = set(self.required_variable_binding() + self.required_variable_initialization())

self.functions = functions

for pd in self._processes:
pd.set_function(self.functions)

def input_names(self):
"""
Input names for all processes.
:return:
"""
return self._input_names

def output_names(self):
"""
Output names of all processes and with integrated derivative variables.
:return:
"""
return self._output_names

def expression_output_names(self):
return self._expr_output_variable_names

def derivative_output_names(self):
return self._diff_output_variable_names

def integrated_output_names(self):
return self._intg_output_variable_names

def clean_output_names(self):
"""
Output names of all processes.
:return:
"""
return self._clean_output_names

def required_variable_binding(self):
"""
Variable names that must be given from external source.
:return:
"""
return [name for name in self._input_names if not name in self._output_names]

def required_variable_initialization(self):
"""
Variable names that must be initialised.
:return:
"""
return self._intg_output_variable_names

def get_derivative_to_integrated_dictionary(self):
"""
Dictionary derivated_variable_name -> (integrated)variable_name.
:return:
"""
return self._diff_to_intg

def __str__(self):
ret = "------------------------FUNCTIONS({}):\n".format(len(self.functions))
ret += "; \n".join(["{}:= {}".format(k, self.functions[k]) for k in self.functions])
ret += "\n------------------------PROCESSES({}):\n".format(len(self._processes))
ret += "EXPRESSIONS:\n"
ret += "; \n".join([str(proc) for proc in self._expression_tree._processes])
ret += "\nDERIVATIVES:\n"
ret += "; \n".join([str(proc) for proc in self._differential_processes])
ret += "\n------------------------VARIABLES({}):".format(
len(self._intg_output_variable_names) +
len(self.required_variable_binding()) +
len(self._diff_output_variable_names) +
len(self.expression_output_names())
)
ret += "\nINTEGRATED:{}".format(self._intg_output_variable_names)
ret += "\nEXTERNAL:{}".format(self.required_variable_binding())
ret += "\nDERIVATIVES:{}".format(self._diff_output_variable_names)
ret += "\nEXPRESSIONS:{}".format(self.expression_output_names())
return ret

def _eval_differential_processes(self, state):
names = []
values = []

for pi in self._differential_processes:
values += pi(state)
names += pi.output_names
return dict(zip(names, values))

def __call__(self, state, **kwargs):
_state = dict((k, state[k]) for k in state)
_state = self._expression_tree(_state)
_state = {**_state, **self._eval_differential_processes(_state)}
return _state

96 changes: 96 additions & 0 deletions dynsys_framework/dynamic_system/model_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from dynsys_framework.dynamic_system.model import Model


class ModelExecutor:
"""
Binds external input into the model and executes it when called.
Each call executor collects external values, evaluates expressions and integrates differential equations.
"""

def __init__(self, model: Model, solver, external_binders):
assert isinstance(model, Model), "model must be instance of Model"
assert isinstance(external_binders, dict), "external_binders must be a dict str->callable"
assert all([callable(b) for b in external_binders.values()]), "external_binders values must be callable"
assert callable(solver), "solver must be callable"

self._model = model
self._solver = solver
self._external_binders = external_binders

self._all_model_inputs = list(model.input_names())

# collecting names of all variables that will be updated

self._external_variable_names = list(external_binders.keys())
self._expression_names = model.expression_output_names()
self._derivative_variable_names = model.derivative_output_names()
self._integrated_variable_names = model.integrated_output_names()

self._available_variable_names = self._external_variable_names + self._expression_names + \
self._derivative_variable_names + self._integrated_variable_names

# preparing storage for variable values - these four defines state at time 't'

self._external_variable_values = dict((name, None) for name in self._external_variable_names)
self._integrated_variable_values = dict((name, None) for name in self._integrated_variable_names)
self._expression_values = dict((name, None) for name in self._expression_names)
self._derivative_variable_values = dict((name, None) for name in self._derivative_variable_names)

# this dictionary will store new integrated variable values computed by solver from state at time 't'
self._next_integrated_variable_values = dict((name, None) for name in self._integrated_variable_names)

assert len(self._available_variable_names) == len(set(self._available_variable_names)), \
"there are duplicates in available_variables:" + str(self._available_variable_names)

for required_var in self._all_model_inputs:
assert required_var in self._available_variable_names, "required variable {} is not available in {}" \
.format(required_var, self._available_variable_names)

def initialize_integrating_variables(self, init_state):
for k in self._integrated_variable_values:
if k in init_state:
self._next_integrated_variable_values[k] = init_state[k]
else:
raise KeyError("Missing variable name {} in init_state {}".format(k, init_state))

def _set_internal_variable_values(self, internal_variable_values):
for k in self._integrated_variable_values:
self._integrated_variable_values[k] = self._next_integrated_variable_values[k]
self._next_integrated_variable_values[k] = internal_variable_values[k]
for k in self._expression_values: # needed only for log purposes
self._expression_values[k] = internal_variable_values[k]
for k in self._derivative_variable_values: # needed only for log purposes
self._derivative_variable_values[k] = internal_variable_values[k]
# _external_variable_values are updated in call

def _update_external_variable_values(self):
# TODO: policies for "waiting"
for k in self._external_variable_values:
self._external_variable_values[k] = self._external_binders[k]()

def get_external_variable_values(self):
return self._external_variable_values

def get__next_integrated_variable_values(self):
return self._next_integrated_variable_values

def get_last_state(self):
return {
**self._integrated_variable_values,
**self._external_variable_values,
**self._expression_values,
**self._derivative_variable_values
}

def __str__(self):
ret = "\n------------------------EXTERNAL BINDING({}):\n".format(len(self._external_binders))
ret += ";\n".join(["{}:= {}".format(k, self._external_binders[k]) for k in self._external_binders])
ret += "\n"
ret += str(self._model)
return ret

def __call__(self, *args, **kwargs):
self._update_external_variable_values()
state = {**self._next_integrated_variable_values, **self._external_variable_values}
self._set_internal_variable_values(
self._solver(self._model, state, self._model.get_derivative_to_integrated_dictionary()))
Loading

0 comments on commit 722135c

Please sign in to comment.