From 82146ad6abdceccfc7d675176e04ced6712fecc1 Mon Sep 17 00:00:00 2001 From: Kevin CO Date: Mon, 24 Feb 2025 18:53:20 -0500 Subject: [PATCH] Refactor in progess for new OcpFes class --- cocofest/integration/ivp_fes.py | 4 +- cocofest/models/ding2003.py | 36 +- cocofest/models/dynamical_model.py | 2 +- cocofest/models/hmed2018.py | 4 +- cocofest/models/hmed2018_with_fatigue.py | 2 +- .../optimization/fes_identification_ocp.py | 12 +- cocofest/optimization/fes_nmpc.py | 4 +- cocofest/optimization/fes_ocp.py | 444 ++---------------- cocofest/optimization/fes_ocp_dynamics.py | 6 +- ...g_with_different_driven_methods_in_nmpc.py | 2 +- .../force_tracking_parameter_optimization.py | 103 ++-- .../pulse_duration_optimization.py | 60 ++- ...pulse_duration_optimization_nmpc_cyclic.py | 8 +- .../pulse_intensity_optimization.py | 67 ++- 14 files changed, 254 insertions(+), 500 deletions(-) diff --git a/cocofest/integration/ivp_fes.py b/cocofest/integration/ivp_fes.py index fdfd66a..512ba4a 100644 --- a/cocofest/integration/ivp_fes.py +++ b/cocofest/integration/ivp_fes.py @@ -334,12 +334,12 @@ def build_initial_guess_from_ocp(self, ocp, stim_idx_at_node_list=None): ) for i in range(self.n_shooting) ] - for j in range(self.model._sum_stim_truncation) + for j in range(self.model.sum_stim_truncation) ] else: pi = self.pulse_intensity[0] if isinstance(self.pulse_intensity, list) else self.pulse_width - initial_guess_list = [[pi] * self.model._sum_stim_truncation] * self.n_shooting + initial_guess_list = [[pi] * self.model.sum_stim_truncation] * self.n_shooting u.add(key, initial_guess=initial_guess_list, phase=0, interpolation=InterpolationType.EACH_FRAME) diff --git a/cocofest/models/ding2003.py b/cocofest/models/ding2003.py index 658b903..ded04a4 100644 --- a/cocofest/models/ding2003.py +++ b/cocofest/models/ding2003.py @@ -1,4 +1,6 @@ from typing import Callable +from math import gcd +from fractions import Fraction import numpy as np from casadi import MX, exp, vertcat @@ -40,7 +42,7 @@ def __init__( super().__init__() self._model_name = model_name self._muscle_name = muscle_name - self._sum_stim_truncation = sum_stim_truncation + self.sum_stim_truncation = sum_stim_truncation self._with_fatigue = False self.pulse_apparition_time = None self.stim_time = stim_time if stim_time else [] @@ -392,12 +394,12 @@ def declare_ding_variables( ConfigureProblem.configure_dynamics_function(ocp, nlp, dyn_func=self.dynamics) def _get_additional_previous_stim_time(self): - while len(self.previous_stim["time"]) < self._sum_stim_truncation: + while len(self.previous_stim["time"]) < self.sum_stim_truncation: self.previous_stim["time"].insert(0, -10000000) return self.previous_stim def get_numerical_data_time_series(self, n_shooting, final_time, all_stim_time=None): - truncation = self._sum_stim_truncation + truncation = self.sum_stim_truncation # --- Set the previous stim time for the numerical data time series (mandatory to avoid nan values) --- # self.previous_stim = self._get_additional_previous_stim_time() stim_time = ( @@ -427,3 +429,31 @@ def get_numerical_data_time_series(self, n_shooting, final_time, all_stim_time=N stim_time_array = np.transpose(temp_result, (2, 1, 0)) return {"stim_time": stim_time_array}, stim_idx_at_node_list + + def get_n_shooting(self, final_time: float) -> int: + """ + Prepare the n_shooting for the ocp in order to have a time step that is a multiple of the stimulation time. + + Returns + ------- + int + The number of shooting points + """ + # Represent the final time as a Fraction for exact arithmetic. + T_final = Fraction(final_time).limit_denominator() + n_shooting = 1 + + for t in self.stim_time: + + t_frac = Fraction(t).limit_denominator() # Convert the stimulation time to an exact fraction. + norm = t_frac / T_final # Compute the normalized time: t / final_time. + d = norm.denominator # The denominator in the reduced fraction gives the requirement. + n_shooting = n_shooting * d // gcd(n_shooting, d) + + if n_shooting >= 1000: + print( + f"Warning: The number of shooting nodes is very high n = {n_shooting}.\n" + "The optimization might be long, consider using stimulation time with even spacing (common frequency)." + ) + + return n_shooting diff --git a/cocofest/models/dynamical_model.py b/cocofest/models/dynamical_model.py index 7bb1144..a8a98b1 100644 --- a/cocofest/models/dynamical_model.py +++ b/cocofest/models/dynamical_model.py @@ -436,7 +436,7 @@ def declare_model_variables( StateConfigure().configure_last_pulse_width(ocp, nlp, muscle_name=str(muscle_model.muscle_name)) if isinstance(muscle_model, DingModelPulseIntensityFrequency): StateConfigure().configure_pulse_intensity( - ocp, nlp, muscle_name=str(muscle_model.muscle_name), truncation=muscle_model._sum_stim_truncation + ocp, nlp, muscle_name=str(muscle_model.muscle_name), truncation=muscle_model.sum_stim_truncation ) if self.activate_residual_torque: ConfigureProblem.configure_tau(ocp, nlp, as_states=False, as_controls=True) diff --git a/cocofest/models/hmed2018.py b/cocofest/models/hmed2018.py index 98555ef..67812b8 100644 --- a/cocofest/models/hmed2018.py +++ b/cocofest/models/hmed2018.py @@ -296,7 +296,7 @@ def declare_ding_variables( A list of values to pass to the dynamics at each node. Experimental external forces should be included here. """ StateConfigure().configure_all_fes_model_states(ocp, nlp, fes_model=self) - StateConfigure().configure_pulse_intensity(ocp, nlp, truncation=self._sum_stim_truncation) + StateConfigure().configure_pulse_intensity(ocp, nlp, truncation=self.sum_stim_truncation) ConfigureProblem.configure_dynamics_function(ocp, nlp, dyn_func=self.dynamics) def min_pulse_intensity(self): @@ -309,7 +309,7 @@ def min_pulse_intensity(self): return (np.arctanh(-self.cr) / self.bs) + self.Is def _get_additional_previous_stim_time(self): - while len(self.previous_stim["time"]) < self._sum_stim_truncation: + while len(self.previous_stim["time"]) < self.sum_stim_truncation: self.previous_stim["time"].insert(0, -10000000) self.previous_stim["pulse_intensity"].insert(0, 50) return self.previous_stim diff --git a/cocofest/models/hmed2018_with_fatigue.py b/cocofest/models/hmed2018_with_fatigue.py index a2e11e4..d66dac4 100644 --- a/cocofest/models/hmed2018_with_fatigue.py +++ b/cocofest/models/hmed2018_with_fatigue.py @@ -312,5 +312,5 @@ def declare_ding_variables( A list of values to pass to the dynamics at each node. Experimental external forces should be included here. """ StateConfigure().configure_all_fes_model_states(ocp, nlp, fes_model=self) - StateConfigure().configure_pulse_intensity(ocp, nlp, truncation=self._sum_stim_truncation) + StateConfigure().configure_pulse_intensity(ocp, nlp, truncation=self.sum_stim_truncation) ConfigureProblem.configure_dynamics_function(ocp, nlp, dyn_func=self.dynamics) diff --git a/cocofest/optimization/fes_identification_ocp.py b/cocofest/optimization/fes_identification_ocp.py index 53361fc..c3644f4 100644 --- a/cocofest/optimization/fes_identification_ocp.py +++ b/cocofest/optimization/fes_identification_ocp.py @@ -105,8 +105,8 @@ def prepare_ocp( numerical_data_time_series, stim_idx_at_node_list = model.get_numerical_data_time_series(n_shooting, final_time) - dynamics = OcpFesId._declare_dynamics(model=model, numerical_data_timeseries=numerical_data_time_series) - x_bounds, x_init = OcpFesId._set_bounds( + dynamics = OcpFesId.declare_dynamics(model=model, numerical_data_timeseries=numerical_data_time_series) + x_bounds, x_init = OcpFesId.set_x_bounds( model=model, force_tracking=objective["force_tracking"], discontinuity_in_ocp=discontinuity_in_ocp, @@ -118,7 +118,7 @@ def prepare_ocp( if isinstance(model, DingModelPulseWidthFrequency) else pulse_intensity["fixed"] if isinstance(model, DingModelPulseIntensityFrequency) else None ) - u_bounds, u_init = OcpFesId._set_u_bounds( + u_bounds, u_init = OcpFesId.set_u_bounds( model=model, control_value=control_value, stim_idx_at_node_list=stim_idx_at_node_list, n_shooting=n_shooting ) @@ -179,7 +179,7 @@ def _sanity_check_id( ) @staticmethod - def _set_bounds( + def set_x_bounds( model: FesModel = None, force_tracking=None, discontinuity_in_ocp=None, @@ -310,7 +310,7 @@ def _set_phase_transition(discontinuity_in_ocp): return phase_transitions @staticmethod - def _set_u_bounds(model, control_value: list, stim_idx_at_node_list: list, n_shooting: int): + def set_u_bounds(model, control_value: list, stim_idx_at_node_list: list, n_shooting: int): # Controls bounds u_bounds = BoundsList() # Controls initial guess @@ -339,7 +339,7 @@ def _set_u_bounds(model, control_value: list, stim_idx_at_node_list: list, n_sho ) for i in range(n_shooting) ] - for j in range(model._sum_stim_truncation) + for j in range(model.sum_stim_truncation) ] u_init.add(key="pulse_intensity", initial_guess=np.array(control_list)[:, 0], phase=0) diff --git a/cocofest/optimization/fes_nmpc.py b/cocofest/optimization/fes_nmpc.py index 36fcd51..1545ce3 100644 --- a/cocofest/optimization/fes_nmpc.py +++ b/cocofest/optimization/fes_nmpc.py @@ -67,7 +67,7 @@ def advance_window_initial_guess_parameters(self, sol, **advance_options): self.parameter_init[key].init[:, :] = reshaped_parameter[self.initial_guess_param_index, :] def update_stim(self): - truncation_term = self.nlp[0].model._sum_stim_truncation + truncation_term = self.nlp[0].model.sum_stim_truncation solution_stimulation_time = self.nlp[0].model.stim_time[-truncation_term:] previous_stim_time = [x - self.phase_time[0] for x in solution_stimulation_time] previous_stim = {"time": previous_stim_time} @@ -143,7 +143,7 @@ def create_model_from_list(self, models: list): stim_time = [val for sublist in stim_time for val in sublist] combined_model = DingModelPulseWidthFrequencyWithFatigue( - stim_time=stim_time, sum_stim_truncation=self.nlp[0].model._sum_stim_truncation + stim_time=stim_time, sum_stim_truncation=self.nlp[0].model.sum_stim_truncation ) return combined_model diff --git a/cocofest/optimization/fes_ocp.py b/cocofest/optimization/fes_ocp.py index 78e5d16..9d5ad02 100644 --- a/cocofest/optimization/fes_ocp.py +++ b/cocofest/optimization/fes_ocp.py @@ -1,29 +1,20 @@ import numpy as np -from math import gcd -from fractions import Fraction from bioptim import ( BoundsList, ConstraintList, - ControlType, DynamicsList, InitialGuessList, InterpolationType, Node, - Objective, ObjectiveFcn, ObjectiveList, - OdeSolver, - OptimalControlProgram, ParameterList, - ParameterObjectiveList, PhaseDynamics, VariableScaling, ) from ..fourier_approx import FourierSeries -from ..models.fes_model import FesModel -from ..models.dynamical_model import FesMskModel from ..models.ding2007 import DingModelPulseWidthFrequency from ..models.hmed2018 import DingModelPulseIntensityFrequency from ..custom_constraints import CustomConstraint @@ -36,400 +27,42 @@ class OcpFes: """ @staticmethod - def _prepare_optimization_problem(input_dict: dict) -> dict: - - (pulse_width, pulse_intensity, objective) = OcpFes._fill_dict( - input_dict["pulse_width"], - input_dict["pulse_intensity"], - input_dict["objective"], - ) - - OcpFes._sanity_check( - model=input_dict["model"], - n_shooting=input_dict["n_shooting"], - final_time=input_dict["final_time"], - objective=objective, - use_sx=input_dict["use_sx"], - ode_solver=input_dict["ode_solver"], - n_threads=input_dict["n_threads"], - ) - - (parameters, parameters_bounds, parameters_init, parameter_objectives) = OcpFes._build_parameters( - model=input_dict["model"], - pulse_width=pulse_width, - pulse_intensity=pulse_intensity, - use_sx=input_dict["use_sx"], - ) - - numerical_data_time_series, stim_idx_at_node_list = input_dict["model"].get_numerical_data_time_series( - input_dict["n_shooting"], input_dict["final_time"] - ) - - dynamics = OcpFes._declare_dynamics(input_dict["model"], numerical_data_time_series) - x_bounds, x_init = OcpFes._set_bounds(input_dict["model"]) - - max_bound = ( - pulse_width["max"] - if isinstance(input_dict["model"], DingModelPulseWidthFrequency) - else pulse_intensity["max"] if isinstance(input_dict["model"], DingModelPulseIntensityFrequency) else None - ) - u_bounds, u_init = OcpFes._set_u_bounds(input_dict["model"], max_bound=max_bound) - - objective_functions = OcpFes._set_objective(input_dict["n_shooting"], objective) - - constraints = ConstraintList() - if isinstance(input_dict["model"], DingModelPulseIntensityFrequency): - constraints = OcpFes._build_constraints( - input_dict["model"], - input_dict["n_shooting"], - stim_idx_at_node_list, - ) - OcpFes.update_model_param(input_dict["model"], parameters) - - optimization_dict = { - "model": input_dict["model"], - "dynamics": dynamics, - "n_shooting": input_dict["n_shooting"], - "final_time": input_dict["final_time"], - "objective_functions": objective_functions, - "x_init": x_init, - "x_bounds": x_bounds, - "u_bounds": u_bounds, - "u_init": u_init, - "constraints": constraints, - "parameters": parameters, - "parameters_bounds": parameters_bounds, - "parameters_init": parameters_init, - "parameter_objectives": parameter_objectives, - "use_sx": input_dict["use_sx"], - "ode_solver": input_dict["ode_solver"], - "n_threads": input_dict["n_threads"], - "control_type": input_dict["control_type"], - } - - return optimization_dict - - @staticmethod - def prepare_ocp( - model: FesModel = None, - final_time: int | float = None, - pulse_width: dict = None, - pulse_intensity: dict = None, - objective: dict = None, - use_sx: bool = True, - ode_solver: OdeSolver.RK1 | OdeSolver.RK2 | OdeSolver.RK4 = OdeSolver.RK1(n_integration_steps=10), - control_type: ControlType = ControlType.CONSTANT, - n_threads: int = 1, - ): - """ - Prepares the Optimal Control Program (OCP) to be solved. - - Parameters - ---------- - model : FesModel - The model type used for the OCP. - final_time : int | float - The final time of the OCP. - pulse_width : dict - Dictionary containing parameters related to the duration of the pulse. - Optional if not using DingModelPulseWidthFrequency or DingModelPulseWidthFrequencyWithFatigue. - pulse_intensity : dict - Dictionary containing parameters related to the intensity of the pulse. - Optional if not using DingModelPulseIntensityFrequency or DingModelPulseIntensityFrequencyWithFatigue. - objective : dict - Dictionary containing parameters related to the optimization objective. - use_sx : bool - The nature of the CasADi variables. MX are used if False. - ode_solver : OdeSolver.RK1 | OdeSolver.RK2 | OdeSolver.RK4 - The ODE solver to use. - n_threads : int - The number of threads to use while solving (multi-threading if > 1). - control_type : ControlType - The type of control to use. - - Returns - ------- - OptimalControlProgram - The prepared Optimal Control Program. - """ - - input_dict = { - "model": model, - "n_shooting": OcpFes.prepare_n_shooting(model.stim_time, final_time), - "final_time": final_time, - "pulse_width": pulse_width, - "pulse_intensity": pulse_intensity, - "objective": objective, - "use_sx": use_sx, - "ode_solver": ode_solver, - "n_threads": n_threads, - "control_type": control_type, - } - - optimization_dict = OcpFes._prepare_optimization_problem(input_dict) - - return OptimalControlProgram( - bio_model=[optimization_dict["model"]], - dynamics=optimization_dict["dynamics"], - n_shooting=optimization_dict["n_shooting"], - phase_time=[optimization_dict["final_time"]], - objective_functions=optimization_dict["objective_functions"], - x_init=optimization_dict["x_init"], - x_bounds=optimization_dict["x_bounds"], - u_bounds=optimization_dict["u_bounds"], - u_init=optimization_dict["u_init"], - constraints=optimization_dict["constraints"], - parameters=optimization_dict["parameters"], - parameter_bounds=optimization_dict["parameters_bounds"], - parameter_init=optimization_dict["parameters_init"], - parameter_objectives=optimization_dict["parameter_objectives"], - control_type=optimization_dict["control_type"], - use_sx=optimization_dict["use_sx"], - ode_solver=optimization_dict["ode_solver"], - n_threads=optimization_dict["n_threads"], - ) - - @staticmethod - def prepare_n_shooting(stim_time, final_time): - """ - Prepare the n_shooting for the ocp in order to have a time step that is a multiple of the stimulation time. - - Returns - ------- - int - The number of shooting points - """ - # Represent the final time as a Fraction for exact arithmetic. - T_final = Fraction(final_time).limit_denominator() - n_shooting = 1 - - for t in stim_time: - # Convert the stimulation time to an exact fraction. - t_frac = Fraction(t).limit_denominator() - # Compute the normalized time: t / final_time. - # This fraction is automatically reduced to the lowest terms. - norm = t_frac / T_final - # The denominator in the reduced fraction gives the requirement. - d = norm.denominator - n_shooting = n_shooting * d // gcd(n_shooting, d) - - if n_shooting >= 1000: - print( - f"Warning: The number of shooting nodes is very high n = {n_shooting}.\n" - "The optimization might be long, consider using stimulation time with even spacing (common frequency)." - ) - - return n_shooting - - @staticmethod - def _fill_dict(pulse_width, pulse_intensity, objective): - """ - This method fills the provided dictionaries with default values if they are not set. - - Parameters - ---------- - pulse_width : dict - Dictionary containing parameters related to the duration of the pulse. - Expected keys are 'fixed', 'min', 'max', and 'bimapping'. - - pulse_intensity : dict - Dictionary containing parameters related to the intensity of the pulse. - Expected keys are 'fixed', 'min', 'max', and 'bimapping'. - - objective : dict - Dictionary containing parameters related to the objective of the optimization. - Expected keys are 'force_tracking', 'end_node_tracking', and 'custom'. - - Returns - ------- - Returns four dictionaries: pulse_width, pulse_intensity, and objective. - Each dictionary is filled with default values for any keys that were not initially set. - """ - - pulse_width = {} if pulse_width is None else pulse_width - default_pulse_width = { - "fixed": None, - "min": None, - "max": None, - "bimapping": False, - } - - pulse_intensity = {} if pulse_intensity is None else pulse_intensity - default_pulse_intensity = { - "fixed": None, - "min": None, - "max": None, - "bimapping": False, - } - - objective = {} if objective is None else objective - default_objective = { - "force_tracking": None, - "end_node_tracking": None, - "cycling": None, - "custom": None, - } - - pulse_width = {**default_pulse_width, **pulse_width} - pulse_intensity = {**default_pulse_intensity, **pulse_intensity} - objective = {**default_objective, **objective} - - return pulse_width, pulse_intensity, objective - - @staticmethod - def _sanity_check( - model=None, - n_shooting=None, - final_time=None, - objective=None, - use_sx=None, - ode_solver=None, - n_threads=None, - ): - if not isinstance(model, FesModel): - if isinstance(model, FesMskModel): - pass - else: - raise TypeError( - f"The current model type used is {type(model)}, it must be a FesModel type." - f"Current available models are: DingModelFrequency, DingModelFrequencyWithFatigue," - f"DingModelPulseWidthFrequency, DingModelPulseWidthFrequencyWithFatigue," - f"DingModelPulseIntensityFrequency, DingModelPulseIntensityFrequencyWithFatigue" - ) - - if not isinstance(n_shooting, int) or n_shooting < 0: - raise TypeError("n_shooting must be a positive int type") - - if not isinstance(final_time, int | float) or final_time < 0: - raise TypeError("final_time must be a positive int or float type") - - if objective["force_tracking"] is not None: - if isinstance(objective["force_tracking"], list): - if isinstance(objective["force_tracking"][0], np.ndarray) and isinstance( - objective["force_tracking"][1], np.ndarray - ): - if ( - len(objective["force_tracking"][0]) != len(objective["force_tracking"][1]) - or len(objective["force_tracking"]) != 2 - ): - raise ValueError( - "force_tracking time and force argument must be same length and force_tracking " - "list size 2" - ) - else: - raise TypeError("force_tracking argument must be np.ndarray type") - else: - raise TypeError("force_tracking must be list type") - - if objective["end_node_tracking"] is not None: - if not isinstance(objective["end_node_tracking"], int | float): - raise TypeError("end_node_tracking must be int or float type") - - if objective["custom"] is not None: - if not isinstance(objective["custom"], ObjectiveList): - raise TypeError("custom_objective must be a ObjectiveList type") - if not all(isinstance(x, Objective) for x in objective["custom"][0]): - raise TypeError("All elements in ObjectiveList must be an Objective type") - - if not isinstance( - ode_solver, - (OdeSolver.RK1, OdeSolver.RK2, OdeSolver.RK4, OdeSolver.COLLOCATION), - ): - raise TypeError("ode_solver must be a OdeSolver type") - - if not isinstance(use_sx, bool): - raise TypeError("use_sx must be a bool type") - - if not isinstance(n_threads, int): - raise TypeError("n_thread must be a int type") - - @staticmethod - def _build_fourier_coefficient(force_tracking): - return FourierSeries().compute_real_fourier_coeffs(force_tracking[0], force_tracking[1], 50) - - @staticmethod - def _build_parameters( + def set_parameters( model, - pulse_intensity, + max_pulse_intensity, use_sx, - **kwargs, ): parameters = ParameterList(use_sx=use_sx) parameters_bounds = BoundsList() parameters_init = InitialGuessList() - parameter_objectives = ParameterObjectiveList() - n_stim = len(model.stim_time) if isinstance(model, DingModelPulseIntensityFrequency): - if pulse_intensity["bimapping"]: - n_stim = 1 - - if pulse_intensity["fixed"]: - parameters.add( - name="pulse_intensity", - function=DingModelPulseIntensityFrequency.set_impulse_intensity, - size=n_stim, - scaling=VariableScaling("pulse_intensity", [1] * n_stim), - ) - if isinstance(pulse_intensity["fixed"], list): - parameters_bounds.add( - "pulse_intensity", - min_bound=np.array(pulse_intensity["fixed"]), - max_bound=np.array(pulse_intensity["fixed"]), - interpolation=InterpolationType.CONSTANT, - ) - parameters_init.add( - key="pulse_intensity", - initial_guess=np.array(pulse_intensity["fixed"]), - ) - else: - parameters_bounds.add( - "pulse_intensity", - min_bound=np.array([pulse_intensity["fixed"]] * n_stim), - max_bound=np.array([pulse_intensity["fixed"]] * n_stim), - interpolation=InterpolationType.CONSTANT, - ) - parameters_init["pulse_intensity"] = np.array([pulse_intensity["fixed"]] * n_stim) - - elif pulse_intensity["max"]: - parameters_bounds.add( - "pulse_intensity", - min_bound=[model.min_pulse_intensity()], - max_bound=[pulse_intensity["max"]], - interpolation=InterpolationType.CONSTANT, - ) - intensity_avg = (model.min_pulse_intensity() + pulse_intensity["max"]) / 2 - parameters_init["pulse_intensity"] = np.array([intensity_avg] * n_stim) - parameters.add( - name="pulse_intensity", - function=DingModelPulseIntensityFrequency.set_impulse_intensity, - size=n_stim, - scaling=VariableScaling("pulse_intensity", [1] * n_stim), - ) + parameters.add( + name="pulse_intensity", + function=DingModelPulseIntensityFrequency.set_impulse_intensity, + size=n_stim, + scaling=VariableScaling("pulse_intensity", [1] * n_stim), + ) + parameters_bounds.add( + "pulse_intensity", + min_bound=[model.min_pulse_intensity()], + max_bound=[max_pulse_intensity], + interpolation=InterpolationType.CONSTANT, + ) + intensity_avg = (model.min_pulse_intensity() + max_pulse_intensity) / 2 + parameters_init["pulse_intensity"] = np.array([intensity_avg] * n_stim) - return (parameters, parameters_bounds, parameters_init, parameter_objectives) + return parameters, parameters_bounds, parameters_init @staticmethod - def _build_constraints(model, n_shooting, stim_idx_at_node_list, bimapped_parameters=False, **kwargs): + def set_constraints(model, n_shooting, stim_idx_at_node_list): constraints = ConstraintList() - bimapped_parameters_idx_list = [0] - bimapped_for_nmpc = bimapped_parameters and "n_cycles_simultaneous" in kwargs - if bimapped_for_nmpc: - n_repeats = int(n_shooting / kwargs["n_cycles_simultaneous"]) - bimapped_parameters_idx_list = [i for i in range(kwargs["n_cycles_simultaneous"]) for _ in range(n_repeats)] - if isinstance(model, DingModelPulseIntensityFrequency): for i in range(n_shooting): - # last_stim_idx = stim_idx_at_node_list[i][-1] - model._sum_stim_truncation - # bimapped_parameters_idx = bimapped_parameters_idx_list[i] if bimapped_for_nmpc else \ - # bimapped_parameters_idx_list[0] - last_stim_idx = stim_idx_at_node_list[i][-1] - constraints.add( CustomConstraint.pulse_intensity_sliding_window_constraint, - # last_stim_index=last_stim_idx if not bimapped_parameters else bimapped_parameters_idx, last_stim_idx=last_stim_idx, muscle_name=model.muscle_name, node=i, @@ -438,7 +71,7 @@ def _build_constraints(model, n_shooting, stim_idx_at_node_list, bimapped_parame return constraints @staticmethod - def _declare_dynamics(model, numerical_data_timeseries=None): + def declare_dynamics(model, numerical_data_timeseries=None): dynamics = DynamicsList() dynamics.add( model.declare_ding_variables, @@ -450,7 +83,7 @@ def _declare_dynamics(model, numerical_data_timeseries=None): return dynamics @staticmethod - def _set_bounds(model): + def set_x_bounds(model): # ---- STATE BOUNDS REPRESENTATION ---- # # # |‾‾‾‾‾‾‾‾‾‾x_max_middle‾‾‾‾‾‾‾‾‾‾‾‾x_max_end‾ @@ -499,11 +132,10 @@ def _set_bounds(model): return x_bounds, x_init @staticmethod - def _set_u_bounds(model, max_bound: int | float): - # Controls bounds - u_bounds = BoundsList() - # Controls initial guess - u_init = InitialGuessList() + def set_u_bounds(model, max_bound: int | float): + u_bounds = BoundsList() # Controls bounds + u_init = InitialGuessList() # Controls initial guess + if isinstance(model, DingModelPulseWidthFrequency): u_init.add(key="last_pulse_width", initial_guess=[0], phase=0) min_pulse_width = model.pd0 if isinstance(model.pd0, int | float) else 0 @@ -515,19 +147,20 @@ def _set_u_bounds(model, max_bound: int | float): ) if isinstance(model, DingModelPulseIntensityFrequency): - u_init.add(key="pulse_intensity", initial_guess=[0] * model._sum_stim_truncation, phase=0) + u_init.add(key="pulse_intensity", initial_guess=[0] * model.sum_stim_truncation, phase=0) min_pulse_intensity = ( model.min_pulse_intensity() if isinstance(model.min_pulse_intensity(), int | float) else 0 ) u_bounds.add( "pulse_intensity", - min_bound=[min_pulse_intensity] * model._sum_stim_truncation, - max_bound=[max_bound] * model._sum_stim_truncation, + min_bound=[min_pulse_intensity] * model.sum_stim_truncation, + max_bound=[max_bound] * model.sum_stim_truncation, interpolation=InterpolationType.CONSTANT, ) return u_bounds, u_init + # TODO: Remove this method @staticmethod def _set_objective(n_shooting, objective): # Creates the objective for our problem @@ -569,9 +202,18 @@ def _set_objective(n_shooting, objective): return objective_functions @staticmethod - def update_model_param(model, parameters): - for param_key in parameters: - if parameters[param_key].function: - param_scaling = parameters[param_key].scaling.scaling - param_reduced = parameters[param_key].cx - parameters[param_key].function(model, param_reduced * param_scaling, **parameters[param_key].kwargs) + def check_and_adjust_dimensions_for_objective_fun(force_to_track, n_shooting, final_time): + if len(force_to_track[0]) != len(force_to_track[1]): + raise ValueError("force_tracking time and force argument must be same length") + if len(force_to_track) != 2: + raise ValueError("force_tracking list size 2") + + force_fourier_coefficient = FourierSeries().compute_real_fourier_coeffs( + force_to_track[0], force_to_track[1], 50 + ) + force_to_track = FourierSeries().fit_func_by_fourier_series_with_real_coeffs( + np.linspace(0, final_time, n_shooting + 1), + force_fourier_coefficient, + )[np.newaxis, :] + + return force_to_track diff --git a/cocofest/optimization/fes_ocp_dynamics.py b/cocofest/optimization/fes_ocp_dynamics.py index 5e0b72e..efc58ec 100644 --- a/cocofest/optimization/fes_ocp_dynamics.py +++ b/cocofest/optimization/fes_ocp_dynamics.py @@ -578,11 +578,11 @@ def _set_u_bounds_msk(u_bounds, u_init, bio_models, with_residual_torque): if isinstance(models[0], DingModelPulseIntensityFrequency): for model in models: key = "pulse_intensity_" + str(model.muscle_name) - u_init.add(key=key, initial_guess=[0] * model._sum_stim_truncation, phase=0) + u_init.add(key=key, initial_guess=[0] * model.sum_stim_truncation, phase=0) u_bounds.add( key=key, - min_bound=[model.min_pulse_intensity()] * model._sum_stim_truncation, - max_bound=[130] * model._sum_stim_truncation, + min_bound=[model.min_pulse_intensity()] * model.sum_stim_truncation, + max_bound=[130] * model.sum_stim_truncation, phase=0, ) diff --git a/examples/fes_multibody/cycling/cycling_with_different_driven_methods_in_nmpc.py b/examples/fes_multibody/cycling/cycling_with_different_driven_methods_in_nmpc.py index a9122d4..13456ab 100644 --- a/examples/fes_multibody/cycling/cycling_with_different_driven_methods_in_nmpc.py +++ b/examples/fes_multibody/cycling/cycling_with_different_driven_methods_in_nmpc.py @@ -59,7 +59,7 @@ def advance_window_initial_guess_states(self, sol, n_cycles_simultaneous=None): return True def update_stim(self, sol): - truncation_term = self.nlp[0].model.muscles_dynamics_model[0]._sum_stim_truncation + truncation_term = self.nlp[0].model.muscles_dynamics_model[0].sum_stim_truncation solution_stimulation_time = self.nlp[0].model.muscles_dynamics_model[0].stim_time[-truncation_term:] previous_stim_time = [x - self.phase_time[0] for x in solution_stimulation_time] for i in range(len(self.nlp[0].model.muscles_dynamics_model)): diff --git a/examples/getting_started/force_tracking_parameter_optimization.py b/examples/getting_started/force_tracking_parameter_optimization.py index 613d2cc..60f29e4 100644 --- a/examples/getting_started/force_tracking_parameter_optimization.py +++ b/examples/getting_started/force_tracking_parameter_optimization.py @@ -6,7 +6,7 @@ import matplotlib.pyplot as plt import numpy as np -from bioptim import SolutionMerge +from bioptim import SolutionMerge, ObjectiveList, ObjectiveFcn, OptimalControlProgram, ControlType, OdeSolver, Node from cocofest import ( ModelMaker, FourierSeries, @@ -14,51 +14,81 @@ ) -def prepare_ocp(force_tracking): - # --- Build ocp --- # - # This ocp was build to track a force curve along the problem. - # The stimulation won't be optimized and is already set to one pulse every 0.1 seconds (n_stim/final_time). - # Plus the pulsation intensity will be optimized between 0 and 130 mA and are not the same across the problem. - model = ModelMaker.create_model("hmed2018", stim_time=list(np.linspace(0, 1, 34)[:-1])) - minimum_pulse_intensity = model.min_pulse_intensity() +def prepare_ocp(model, final_time, pi_max, force_tracking): + # --- Set dynamics --- # + n_shooting = model.get_n_shooting(final_time=final_time) + numerical_data_time_series, stim_idx_at_node_list = model.get_numerical_data_time_series(n_shooting, final_time) + dynamics = OcpFes.declare_dynamics(model, numerical_data_time_series) + + # --- Set initial guesses and bounds for states and controls --- # + x_bounds, x_init = OcpFes.set_x_bounds(model) + u_bounds, u_init = OcpFes.set_u_bounds(model, max_bound=pi_max) + + # --- Set objective functions --- # + objective_functions = ObjectiveList() + force_to_track = OcpFes.check_and_adjust_dimensions_for_objective_fun( + force_to_track=force_tracking, n_shooting=n_shooting, final_time=final_time + ) + objective_functions.add( + ObjectiveFcn.Lagrange.TRACK_STATE, + key="F", + weight=100, + target=force_to_track, + node=Node.ALL, + quadratic=True, + ) - return OcpFes().prepare_ocp( + # --- Set parameters (required for intensity models) --- # + use_sx = True + parameters, parameters_bounds, parameters_init = OcpFes.set_parameters( model=model, - final_time=1, - pulse_intensity={ - "min": minimum_pulse_intensity, - "max": 130, - "bimapping": False, - }, - objective={"force_tracking": force_tracking}, - use_sx=True, - n_threads=8, + max_pulse_intensity=pi_max, + use_sx=use_sx, + ) + + # --- Set constraints (required for intensity models) --- # + constraints = OcpFes.set_constraints( + model, + n_shooting, + stim_idx_at_node_list, + ) + + return OptimalControlProgram( + bio_model=[model], + dynamics=dynamics, + n_shooting=n_shooting, + phase_time=final_time, + objective_functions=objective_functions, + x_init=x_init, + x_bounds=x_bounds, + u_bounds=u_bounds, + u_init=u_init, + parameters=parameters, + parameter_bounds=parameters_bounds, + parameter_init=parameters_init, + constraints=constraints, + control_type=ControlType.CONSTANT, + use_sx=use_sx, + ode_solver=OdeSolver.RK4(n_integration_steps=10), + n_threads=20, ) def main(): + final_time = 1 + model = ModelMaker.create_model("hmed2018", stim_time=list(np.linspace(0, 1, 34)[:-1])) + # --- Building force to track ---# time = np.linspace(0, 1, 1001) force = abs(np.sin(time * 5) + np.random.normal(scale=0.1, size=len(time))) * 100 force_tracking = [time, force] - ocp = prepare_ocp(force_tracking) + ocp = prepare_ocp(model=model, final_time=final_time, pi_max=130, force_tracking=force_tracking) sol = ocp.solve() - controls = sol.stepwise_controls(to_merge=SolutionMerge.NODES) - time = sol.decision_time(to_merge=SolutionMerge.NODES).T[0] - for i in range(controls["pulse_intensity"].shape[0]): - plt.plot(time[:-1], controls["pulse_intensity"][i], label="stimulation intensity_" + str(i)) - plt.legend() - plt.show() - - parameters = sol.parameters - print(parameters["pulse_intensity"]) - - # --- Show the optimization results --- # - sol.graphs() # --- Show results from solution --- # - sol_merged = sol.decision_states(to_merge=[SolutionMerge.PHASES, SolutionMerge.NODES]) + sol_merged = sol.stepwise_states(to_merge=[SolutionMerge.NODES]) + sol_time = sol.stepwise_time(to_merge=[SolutionMerge.NODES]).T[0] fourier_fun = FourierSeries() fourier_coef = fourier_fun.compute_real_fourier_coeffs(time, force, 50) @@ -66,12 +96,8 @@ def main(): plt.title("Comparison between given and simulated force after parameter optimization") plt.plot(time, force, color="red", label="force from file") plt.plot(time, y_approx, color="orange", label="force after fourier transform") - - solution_time = sol.decision_time(to_merge=SolutionMerge.KEYS, continuous=True) - solution_time = [float(j) for j in solution_time] - plt.plot( - solution_time, + sol_time, sol_merged["F"].squeeze(), color="blue", label="force from optimized stimulation", @@ -81,6 +107,9 @@ def main(): plt.legend() plt.show() + # --- Show the optimization results --- # + sol.graphs() + if __name__ == "__main__": main() diff --git a/examples/getting_started/pulse_duration_optimization.py b/examples/getting_started/pulse_duration_optimization.py index 1400ed0..08f620a 100644 --- a/examples/getting_started/pulse_duration_optimization.py +++ b/examples/getting_started/pulse_duration_optimization.py @@ -1,47 +1,57 @@ """ This example will do a 10 stimulation example with Ding's 2007 pulse width and frequency model. -The stimulation will optimized the pulsation width between 0 and 0.0006 seconds to match a force value of 200N at the +The stimulation will be optimized the pulsation width between 0 and 0.0006 seconds to match a force value of 200N at the end of the last node while minimizing the muscle force state. """ import numpy as np -from bioptim import Solver, OdeSolver, ObjectiveList, ObjectiveFcn +from bioptim import Solver, OdeSolver, ObjectiveList, ObjectiveFcn, OptimalControlProgram, ControlType, Node from cocofest import OcpFes, ModelMaker, FES_plot -def prepare_ocp(): - # --- Build ocp --- # - final_time = 0.2 - model = ModelMaker.create_model( - "ding2007_with_fatigue", - sum_stim_truncation=10, - stim_time=list(np.linspace(0, final_time, 11)[:-1]), - previous_stim={"time": [-0.15, -0.10, -0.05], "pulse_width": [0.0005, 0.0005, 0.0005]}, - ) +def prepare_ocp(model, final_time, pw_max): + # --- Set dynamics --- # + n_shooting = model.get_n_shooting(final_time=final_time) + numerical_data_time_series, stim_idx_at_node_list = model.get_numerical_data_time_series(n_shooting, final_time) + dynamics = OcpFes.declare_dynamics(model, numerical_data_time_series) + + # --- Set initial guesses and bounds for states and controls --- # + x_bounds, x_init = OcpFes.set_x_bounds(model) + u_bounds, u_init = OcpFes.set_u_bounds(model, max_bound=pw_max) - # --- Minimize muscle force objective function --- # + # --- Set objective functions --- # objective_functions = ObjectiveList() objective_functions.add(ObjectiveFcn.Lagrange.MINIMIZE_STATE, key="F", weight=1, quadratic=True) + objective_functions.add( + ObjectiveFcn.Mayer.MINIMIZE_STATE, key="F", node=Node.END, target=200, weight=1e5, quadratic=True + ) - # --- Prepare ocp --- # - minimum_pulse_width = model.pd0 - return OcpFes().prepare_ocp( - model=model, - final_time=final_time, - pulse_width={ - "min": minimum_pulse_width, - "max": 0.0006, - "bimapping": False, - }, - objective={"end_node_tracking": 200, "custom": objective_functions}, + return OptimalControlProgram( + bio_model=[model], + dynamics=dynamics, + n_shooting=n_shooting, + phase_time=final_time, + objective_functions=objective_functions, + x_init=x_init, + x_bounds=x_bounds, + u_bounds=u_bounds, + u_init=u_init, + control_type=ControlType.CONSTANT, use_sx=True, - n_threads=8, ode_solver=OdeSolver.RK4(n_integration_steps=10), + n_threads=20, ) def main(): - ocp = prepare_ocp() + final_time = 0.2 + model = ModelMaker.create_model( + "ding2007_with_fatigue", + sum_stim_truncation=10, + stim_time=list(np.linspace(0, final_time, 11)[:-1]), + previous_stim={"time": [-0.15, -0.10, -0.05]}, + ) + ocp = prepare_ocp(model=model, final_time=final_time, pw_max=0.0006) # --- Solve the program --- # sol = ocp.solve(Solver.IPOPT()) diff --git a/examples/getting_started/pulse_duration_optimization_nmpc_cyclic.py b/examples/getting_started/pulse_duration_optimization_nmpc_cyclic.py index fbc1659..0943824 100644 --- a/examples/getting_started/pulse_duration_optimization_nmpc_cyclic.py +++ b/examples/getting_started/pulse_duration_optimization_nmpc_cyclic.py @@ -51,12 +51,12 @@ def prepare_nmpc( total_cycle_len, total_cycle_duration ) - dynamics = OcpFes._declare_dynamics(model, numerical_data_time_series) + dynamics = OcpFes.declare_dynamics(model, numerical_data_time_series) - x_bounds, x_init = OcpFes._set_bounds(model) - u_bounds, u_init = OcpFes._set_u_bounds(model, max_pulse_width) + x_bounds, x_init = OcpFes.set_x_bounds(model) + u_bounds, u_init = OcpFes.set_u_bounds(model, max_pulse_width) - constraints = OcpFes._build_constraints( + constraints = OcpFes.set_constraints( model, total_cycle_len, stim_idx_at_node_list, diff --git a/examples/getting_started/pulse_intensity_optimization.py b/examples/getting_started/pulse_intensity_optimization.py index e8da683..0b6ebb3 100644 --- a/examples/getting_started/pulse_intensity_optimization.py +++ b/examples/getting_started/pulse_intensity_optimization.py @@ -4,32 +4,75 @@ """ import numpy as np -from bioptim import OdeSolver +from bioptim import OdeSolver, ObjectiveList, ObjectiveFcn, OptimalControlProgram, ControlType, Node from cocofest import DingModelPulseIntensityFrequencyWithFatigue, OcpFes # --- Build ocp --- # -# This ocp was build to match a force value of 200N at the end of the last node. +# This ocp was build to match a force value of 135 N at the end of the last node. # The stimulation won't be optimized and is already set to one pulse every 0.1 seconds (n_stim/final_time). # Plus the pulsation intensity will be optimized between 0 and 130 mA and are not the same across the problem. -def prepare_ocp(): - final_time = 1 - model = DingModelPulseIntensityFrequencyWithFatigue(stim_time=list(np.linspace(0, final_time, 11)[:-1])) +def prepare_ocp(model, final_time, pi_max): + # --- Set dynamics --- # + n_shooting = model.get_n_shooting(final_time=final_time) + numerical_data_time_series, stim_idx_at_node_list = model.get_numerical_data_time_series(n_shooting, final_time) + dynamics = OcpFes.declare_dynamics(model, numerical_data_time_series) + + # --- Set initial guesses and bounds for states and controls --- # + x_bounds, x_init = OcpFes.set_x_bounds(model) + u_bounds, u_init = OcpFes.set_u_bounds(model, max_bound=pi_max) - return OcpFes().prepare_ocp( + # --- Set objective functions --- # + objective_functions = ObjectiveList() + objective_functions.add(ObjectiveFcn.Lagrange.MINIMIZE_STATE, key="F", weight=1, quadratic=True) + objective_functions.add( + ObjectiveFcn.Mayer.MINIMIZE_STATE, key="F", node=Node.END, target=135, weight=1e5, quadratic=True + ) + + # --- Set parameters (required for intensity models) --- # + use_sx = True + parameters, parameters_bounds, parameters_init = OcpFes.set_parameters( model=model, - final_time=final_time, - pulse_intensity={"max": 130}, - objective={"end_node_tracking": 200}, - use_sx=True, - n_threads=8, + max_pulse_intensity=pi_max, + use_sx=use_sx, + ) + + # --- Set constraints (required for intensity models) --- # + constraints = OcpFes.set_constraints( + model, + n_shooting, + stim_idx_at_node_list, + ) + + return OptimalControlProgram( + bio_model=[model], + dynamics=dynamics, + n_shooting=n_shooting, + phase_time=final_time, + objective_functions=objective_functions, + x_init=x_init, + x_bounds=x_bounds, + u_bounds=u_bounds, + u_init=u_init, + parameters=parameters, + parameter_bounds=parameters_bounds, + parameter_init=parameters_init, + constraints=constraints, + control_type=ControlType.CONSTANT, + use_sx=use_sx, ode_solver=OdeSolver.RK4(n_integration_steps=10), + n_threads=20, ) def main(): - ocp = prepare_ocp() + final_time = 1 + model = DingModelPulseIntensityFrequencyWithFatigue( + stim_time=list(np.linspace(0, final_time, 11)[:-1]), sum_stim_truncation=10 + ) + pi_max = 130 + ocp = prepare_ocp(model=model, final_time=final_time, pi_max=pi_max) # --- Solve the program --- # sol = ocp.solve() # --- Show results --- #