diff --git a/ceruleo/results/cv.py b/ceruleo/results/cv.py new file mode 100644 index 00000000..b2442c0b --- /dev/null +++ b/ceruleo/results/cv.py @@ -0,0 +1,208 @@ + + + +from typing import List +from ceruleo.results.results import PredictionResult +import numpy as np + +def cv_regression_metrics_single_model( + results: List[PredictionResult], threshold: float = np.inf +): + errors = {"MAE": [], "MAE SW": [], "MSE": [], "MSE SW": [], "MAPE": []} + for result in results: + y_mask = np.where(result.true_RUL <= threshold)[0] + y_true = np.squeeze(result.true_RUL[y_mask]) + y_pred = np.squeeze(result.predicted_RUL[y_mask]) + mask = np.isfinite(y_pred) + y_pred = y_pred[mask] + y_true = y_true[mask] + + if len(np.unique(y_pred)) == 1: + continue + + sw = compute_sample_weight( + "relative", + y_true, + y_pred, + ) + try: + MAE_SW = mae( + y_true, + y_pred, + sample_weight=sw, + ) + except: + MAE_SW = np.nan + try: + MAE = mae(y_true, y_pred) + except: + MAE = np.nan + + try: + MSE_SW = mse( + y_true, + y_pred, + sample_weight=sw, + ) + except: + MSE_SW = np.nan + + try: + MSE = mse(y_true, y_pred) + except: + MSE = np.nan + + try: + MAPE = mape(y_true, y_pred) + except: + MAPE = np.nan + + lives = split_lives(result) + errors["MAE"].append(MAE) + errors["MAE SW"].append(MAE_SW) + errors["MSE"].append(MSE) + errors["MSE SW"].append(MSE_SW) + errors["MAPE"].append(MAPE) + + errors1 = {} + for k in errors.keys(): + errors1[k] = ufloat( + np.round(np.mean(errors[k]), 2), np.round(np.std(errors[k]), 2) + ) + return errors1 + + +def cv_regression_metrics( + results_dict: Dict[str, List[PredictionResult]], threshold: float = np.inf +) -> dict: + """ + Compute regression metrics for each model + + Parameters: + data: Dictionary with the model predictions. + threshold: Compute metrics errors only in RUL values less than the threshold + + Returns: + A dictionary with the following structure: + d: { ['Model]: + { + 'MAE': { + 'mean': + 'std': + }, + 'MAE SW': { + 'mean': + 'std': + }, + 'MSE': { + 'mean': + 'std': + }, + } + ] + + """ + out = {} + for model_name in results_dict.keys(): + out[model_name] = cv_regression_metrics_single_model( + results_dict[model_name], threshold + ) + return out + + + + +def model_cv_results( + results: List[PredictionResult], + nbins: Optional[int] = None, + bin_edges: Optional[np.ndarray] = None, +) -> CVResults: + if nbins is None and bin_edges is None: + raise ValueError("nbins and bin_edges cannot be both None") + if nbins is None: + nbins = len(bin_edges) - 1 + if bin_edges is None: + max_y_value = np.max([r.true_RUL.max() for r in results]) + bin_edges = np.linspace(0, max_y_value, nbins + 1) + + trues = [] + predicted = [] + for results in results: + trues.append(results.true_RUL) + predicted.append(results.predicted_RUL) + return CVResults(trues, predicted, nbins=nbins, bin_edges=bin_edges) + + +def models_cv_results( + results_dict: Dict[str, List[PredictionResult]], nbins: int +) -> Tuple[np.ndarray, Dict[str, CVResults]]: + """Create a dictionary with the result of each cross validation of the model""" + max_y_value = np.max( + [ + r.true_RUL.max() + for model_name in results_dict.keys() + for r in results_dict[model_name] + ] + ) + bin_edges = np.linspace(0, max_y_value, nbins + 1) + model_results = {} + for model_name in results_dict.keys(): + model_results[model_name] = model_cv_results( + results_dict[model_name], bin_edges=bin_edges + ) + + return bin_edges, model_results + +class CVResults: + """ + Compute the error histogram + + Compute the error with respect to the RUL considering the results of different + folds + + Parameters: + y_true: List with the true values of each hold-out set of a cross validation + y_pred: List with the predictions of each hold-out set of a cross validation + nbins: Number of bins to compute the histogram + + """ + + def __init__( + self, + y_true: List[List], + y_pred: List[List], + nbins: int = 5, + bin_edges: Optional[np.array] = None, + ): + if bin_edges is None: + max_value = np.max([np.max(y) for y in y_true]) + bin_edges = np.linspace(0, max_value, nbins + 1) + self.n_folds = len(y_true) + assert bin_edges is not None + self.n_bins = len(bin_edges) - 1 + self.bin_edges = bin_edges + self.mean_error = np.zeros((self.n_folds, self.n_bins)) + self.mae = np.zeros((self.n_folds, self.n_bins)) + self.mse = np.zeros((self.n_folds, self.n_bins)) + self.errors = [] + for i, (y_pred, y_true) in enumerate(zip(y_pred, y_true)): + self._add_fold_result(i, y_pred, y_true) + + def _add_fold_result(self, fold: int, y_pred: np.array, y_true: np.array): + y_pred = np.squeeze(y_pred) + y_true = np.squeeze(y_true) + + for j in range(len(self.bin_edges) - 1): + mask = (y_true >= self.bin_edges[j]) & (y_true <= self.bin_edges[j + 1]) + indices = np.where(mask)[0] + + if len(indices) == 0: + continue + + errors = y_true[indices] - y_pred[indices] + + self.mean_error[fold, j] = np.mean(errors) + + self.mae[fold, j] = np.mean(np.abs(errors)) + self.mse[fold, j] = np.mean((errors) ** 2) + self.errors.append(errors) \ No newline at end of file diff --git a/ceruleo/results/metrics.py b/ceruleo/results/metrics.py new file mode 100644 index 00000000..28aa470c --- /dev/null +++ b/ceruleo/results/metrics.py @@ -0,0 +1,104 @@ + + +from typing import List +from ceruleo.results.results import FittedLife, PredictionResult, split_lives +import numpy as np +from typing import Tuple + +def unexploited_lifetime_from_cv( + lives: List[List[FittedLife]], window_size: int, n: int +): + std_per_window = [] + mean_per_window = [] + windows = np.linspace(0, window_size, n) + for m in windows: + jj = [] + for r in lives: + ul_cv_list = [life.unexploited_lifetime(m) for life in r] + + jj.extend(ul_cv_list) + mean_per_window.append(np.mean(jj)) + std_per_window.append(np.std(jj)) + + return windows, np.array(mean_per_window), np.array(std_per_window) + + +def unexpected_breaks( + d: List[PredictionResult], window_size: int, step: int +) -> Tuple[np.ndarray, np.ndarray]: + """ + Compute the risk of unexpected breaks with respect to the maintenance window size + + Parameters: + d: Dictionary with the results + window_size: Maximum size of the maintenance windows + step: Number of points in which compute the risks. + step different maintenance windows will be used. + + Returns: + A tuple of np.arrays with: + - Maintenance window size evaluated + - Risk computed for every window size used + """ + + bb = [split_lives(fold) for fold in d] + return unexpected_breaks_from_cv(bb, window_size, step) + + +def unexpected_breaks_from_cv( + lives: List[List[FittedLife]], window_size: int, n: int +) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + Compute the risk of unexpected breaks given a Cross-Validation results + + Parameters: + lives: Cross validation results. + window_size: Maximum size of the maintenance window + n: Number of points to evaluate the risk of unexpected breaks + + + Returns: + A tuple of np.arrays with: + - Maintenance window size evaluated + - Risk computed for every window size used + """ + std_per_window = [] + mean_per_window = [] + windows = np.linspace(0, window_size, n) + for m in windows: + jj = [] + for r in lives: + ul_cv_list = [life.unexpected_break(m) for life in r] + jj.extend(ul_cv_list) + mean_per_window.append(np.mean(jj)) + std_per_window.append(np.std(jj)) + return windows, np.array(mean_per_window), np.array(std_per_window) + + +def metric_J_from_cv(lives: List[List[FittedLife]], window_size: int, n: int, q1, q2): + J = [] + windows = np.linspace(0, window_size, n) + for m in windows: + J_of_m = [] + for r in lives: + ub_cv_list = np.array([life.unexpected_break(m) for life in r]) + ub_cv_list = (ub_cv_list / (np.max(ub_cv_list) + 0.0000000001)) * q1 + ul_cv_list = np.array([life.unexploited_lifetime(m) for life in r]) + ul_cv_list = (ul_cv_list / (np.max(ul_cv_list) + 0.0000000001)) * q2 + values = ub_cv_list + ul_cv_list + mean_J = np.mean(values) + std_ul_cv = np.std(values) + J_of_m.append(mean_J) + J.append(np.mean(J_of_m)) + return windows, J + + +def metric_J(d, window_size: int, step: int): + lives_cv = [split_lives(cv) for cv in d] + return metric_J_from_cv(lives_cv, window_size, step) + + + +def unexploited_lifetime(d: PredictionResult, window_size: int, step: int): + bb = [split_lives(cv) for cv in d] + return unexploited_lifetime_from_cv(bb, window_size, step) diff --git a/ceruleo/results/results.py b/ceruleo/results/results.py index d3c050e6..53cd8b50 100644 --- a/ceruleo/results/results.py +++ b/ceruleo/results/results.py @@ -26,9 +26,10 @@ """ import logging from dataclasses import dataclass, field -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Literal, Optional, Tuple, Union import numpy as np +from pydantic import BaseModel, Field from sklearn.metrics import mean_absolute_error as mae from sklearn.metrics import mean_absolute_percentage_error as mape from sklearn.metrics import mean_squared_error as mse @@ -38,18 +39,19 @@ PiecewesieLinearFunction, PiecewiseLinearRegression, ) +from ceruleo.results.utils import compute_sample_weight, split_lives_indices logger = logging.getLogger(__name__) -@dataclass -class MetricsResult: + +class MetricsResult(BaseModel): """An object that store regression metrics and times""" mae: float mse: float - fitting_time: float = field(default_factory=lambda: 0) - prediction_time: float = field(default_factory=lambda: 0) + fitting_time: float = Field(default=0) + prediction_time: float = Field(default=0) @dataclass @@ -59,7 +61,7 @@ class PredictionResult: name: str true_RUL: np.ndarray predicted_RUL: np.ndarray - metrics: MetricsResult = field(default_factory=lambda: MetricsResult(0, 0)) + metrics: MetricsResult = field(default_factory=lambda: MetricsResult(mae=0, mse=0)) def compute_metrics(self): self.metrics.mae = mae(self.true_RUL, self.predicted_RUL) @@ -71,121 +73,6 @@ def __post_init__(self): self.compute_metrics() -def compute_sample_weight(sample_weight, y_true, y_pred, c: float = 0.9): - if sample_weight == "relative": - sample_weight = np.abs(y_true - y_pred) / (np.clip(y_true, c, np.inf)) - else: - sample_weight = 1 - return sample_weight - - -def compute_rul_line(rul: float, n: int, tt: Optional[np.array] = None): - if tt is None: - tt = -np.ones(n) - z = np.zeros(n) - z[0] = rul - for i in range(len(tt) - 1): - z[i + 1] = max(z[i] + tt[i], 0) - if z[i + 1] - 0 < 0.0000000000001: - break - return z - - -class CVResults: - """ - Compute the error histogram - - Compute the error with respect to the RUL considering the results of different - folds - - Parameters: - y_true: List with the true values of each hold-out set of a cross validation - y_pred: List with the predictions of each hold-out set of a cross validation - nbins: Number of bins to compute the histogram - - """ - - def __init__( - self, - y_true: List[List], - y_pred: List[List], - nbins: int = 5, - bin_edges: Optional[np.array] = None, - ): - if bin_edges is None: - max_value = np.max([np.max(y) for y in y_true]) - bin_edges = np.linspace(0, max_value, nbins + 1) - self.n_folds = len(y_true) - self.n_bins = len(bin_edges) - 1 - self.bin_edges = bin_edges - self.mean_error = np.zeros((self.n_folds, self.n_bins)) - self.mae = np.zeros((self.n_folds, self.n_bins)) - self.mse = np.zeros((self.n_folds, self.n_bins)) - self.errors = [] - for i, (y_pred, y_true) in enumerate(zip(y_pred, y_true)): - self._add_fold_result(i, y_pred, y_true) - - def _add_fold_result(self, fold: int, y_pred: np.array, y_true: np.array): - y_pred = np.squeeze(y_pred) - y_true = np.squeeze(y_true) - - for j in range(len(self.bin_edges) - 1): - mask = (y_true >= self.bin_edges[j]) & (y_true <= self.bin_edges[j + 1]) - indices = np.where(mask)[0] - - if len(indices) == 0: - continue - - errors = y_true[indices] - y_pred[indices] - - self.mean_error[fold, j] = np.mean(errors) - - self.mae[fold, j] = np.mean(np.abs(errors)) - self.mse[fold, j] = np.mean((errors) ** 2) - self.errors.append(errors) - - -def model_cv_results( - results: List[PredictionResult], - nbins: Optional[int] = None, - bin_edges: Optional[np.ndarray] = None, -) -> CVResults: - if nbins is None and bin_edges is None: - raise ValueError("nbins and bin_edges cannot be both None") - if nbins is None: - nbins = len(bin_edges) - 1 - if bin_edges is None: - max_y_value = np.max([r.true_RUL.max() for r in results]) - bin_edges = np.linspace(0, max_y_value, nbins + 1) - - trues = [] - predicted = [] - for results in results: - trues.append(results.true_RUL) - predicted.append(results.predicted_RUL) - return CVResults(trues, predicted, nbins=nbins, bin_edges=bin_edges) - - -def models_cv_results( - results_dict: Dict[str, List[PredictionResult]], nbins: int -) -> Tuple[np.ndarray, Dict[str, CVResults]]: - """Create a dictionary with the result of each cross validation of the model""" - max_y_value = np.max( - [ - r.true_RUL.max() - for model_name in results_dict.keys() - for r in results_dict[model_name] - ] - ) - bin_edges = np.linspace(0, max_y_value, nbins + 1) - model_results = {} - for model_name in results_dict.keys(): - model_results[model_name] = model_cv_results( - results_dict[model_name], bin_edges=bin_edges - ) - - return bin_edges, model_results - class FittedLife: """Represent a Fitted run-to-cycle failure @@ -201,10 +88,10 @@ class FittedLife: def __init__( self, - y_true: np.array, - y_pred: np.array, - time: Optional[Union[np.array, int]] = None, - fit_line_not_increasing: Optional[bool] = False, + y_true: np.ndarray, + y_pred: np.ndarray, + time: Optional[Union[np.ndarray, int]] = None, + fit_line_not_increasing: bool = False, RUL_threshold: Optional[float] = None, ): self.fit_line_not_increasing = fit_line_not_increasing @@ -216,7 +103,7 @@ def __init__( if isinstance(time, np.ndarray): self.time = time else: - self.time = np.array(np.linspace(0, y_true[0], n=len(y_true))) + self.time = np.linspace(start=0,stop=int(y_true[0]), num=len(y_true)) else: self.degrading_start, self.time = FittedLife.compute_time_feature( @@ -240,7 +127,7 @@ def __init__( @staticmethod def compute_time_feature( - y_true: np.array, RUL_threshold: Optional[float] = None + y_true: np.ndarray, RUL_threshold: Optional[float] = None ) -> Tuple[float, np.ndarray]: """Compute the time feature based on the target @@ -258,7 +145,7 @@ def compute_time_feature( @staticmethod def _degrading_start( y_true: np.array, RUL_threshold: Optional[float] = None - ) -> float: + ) -> int: """ Obtain the index when the life value is lower than the RUL_threshold @@ -283,7 +170,7 @@ def _degrading_start( return degrading_start @staticmethod - def _compute_time(y_true: np.array, degrading_start: int) -> np.array: + def _compute_time(y_true: np.ndarray, degrading_start_index: int) -> np.ndarray: """ Compute the passage of time from the true RUL @@ -302,18 +189,18 @@ def _compute_time(y_true: np.array, degrading_start: int) -> np.array: if len(y_true) == 1: return np.array([0]) - time_diff = np.diff(np.squeeze(y_true)[degrading_start:][::-1]) + time_diff = np.diff(np.squeeze(y_true)[degrading_start_index:][::-1]) time = np.zeros(len(y_true)) - if degrading_start > 0: + if degrading_start_index > 0: if len(time_diff) > 0: - time[0 : degrading_start + 1] = np.median(time_diff) + time[0 : degrading_start_index + 1] = np.median(time_diff) else: - time[0 : degrading_start + 1] = 1 - time[degrading_start + 1 :] = time_diff + time[0 : degrading_start_index + 1] = 1 + time[degrading_start_index + 1 :] = time_diff return np.cumsum(time) - def _fit_picewise_linear_regression(self, y: np.array) -> PiecewesieLinearFunction: + def _fit_picewise_linear_regression(self, y: np.ndarray) -> PiecewesieLinearFunction: """ Fit the array trough a picewise linear regression @@ -425,35 +312,13 @@ def unexpected_break(self, m: float = 0, tolerance: float = 0) -> bool: return True -def split_lives_indices(y_true: np.array) -> List[List[int]]: - """ - Obtain a list of indices for each life - Parameters: - y_true: True vector with the RUL - - Returns: - A list with the indices belonging to each life - """ - assert len(y_true) >= 2 - lives_indices = ( - [0] - + (np.where(np.diff(np.squeeze(y_true)) > 0)[0] + 1).tolist() - + [len(y_true)] - ) - indices = [] - for i in range(len(lives_indices) - 1): - r = range(lives_indices[i], lives_indices[i + 1]) - if len(r) <= 1: - continue - indices.append(r) - return indices def split_lives( results: PredictionResult, RUL_threshold: Optional[float] = None, - fit_line_not_increasing: Optional[bool] = False, + fit_line_not_increasing: bool = False, time: Optional[int] = None, ) -> List[FittedLife]: """ @@ -483,232 +348,3 @@ def split_lives( ) return lives - -def unexploited_lifetime(d: PredictionResult, window_size: int, step: int): - bb = [split_lives(cv) for cv in d] - return unexploited_lifetime_from_cv(bb, window_size, step) - - -def unexploited_lifetime_from_cv( - lives: List[List[FittedLife]], window_size: int, n: int -): - std_per_window = [] - mean_per_window = [] - windows = np.linspace(0, window_size, n) - for m in windows: - jj = [] - for r in lives: - ul_cv_list = [life.unexploited_lifetime(m) for life in r] - - jj.extend(ul_cv_list) - mean_per_window.append(np.mean(jj)) - std_per_window.append(np.std(jj)) - - return windows, np.array(mean_per_window), np.array(std_per_window) - - -def unexpected_breaks( - d: List[PredictionResult], window_size: int, step: int -) -> Tuple[np.ndarray, np.ndarray]: - """ - Compute the risk of unexpected breaks with respect to the maintenance window size - - Parameters: - d: Dictionary with the results - window_size: Maximum size of the maintenance windows - step: Number of points in which compute the risks. - step different maintenance windows will be used. - - Returns: - A tuple of np.arrays with: - - Maintenance window size evaluated - - Risk computed for every window size used - """ - - bb = [split_lives(fold) for fold in d] - return unexpected_breaks_from_cv(bb, window_size, step) - - -def unexpected_breaks_from_cv( - lives: List[List[FittedLife]], window_size: int, n: int -) -> Tuple[np.ndarray, np.ndarray]: - """ - Compute the risk of unexpected breaks given a Cross-Validation results - - Parameters: - lives: Cross validation results. - window_size: Maximum size of the maintenance window - n: Number of points to evaluate the risk of unexpected breaks - - - Returns: - A tuple of np.arrays with: - - Maintenance window size evaluated - - Risk computed for every window size used - """ - std_per_window = [] - mean_per_window = [] - windows = np.linspace(0, window_size, n) - for m in windows: - jj = [] - for r in lives: - ul_cv_list = [life.unexpected_break(m) for life in r] - jj.extend(ul_cv_list) - mean_per_window.append(np.mean(jj)) - std_per_window.append(np.std(jj)) - return windows, np.array(mean_per_window), np.array(std_per_window) - - -def metric_J_from_cv(lives: List[List[FittedLife]], window_size: int, n: int, q1, q2): - J = [] - windows = np.linspace(0, window_size, n) - for m in windows: - J_of_m = [] - for r in lives: - ub_cv_list = np.array([life.unexpected_break(m) for life in r]) - ub_cv_list = (ub_cv_list / (np.max(ub_cv_list) + 0.0000000001)) * q1 - ul_cv_list = np.array([life.unexploited_lifetime(m) for life in r]) - ul_cv_list = (ul_cv_list / (np.max(ul_cv_list) + 0.0000000001)) * q2 - values = ub_cv_list + ul_cv_list - mean_J = np.mean(values) - std_ul_cv = np.std(values) - J_of_m.append(mean_J) - J.append(np.mean(J_of_m)) - return windows, J - - -def metric_J(d, window_size: int, step: int): - lives_cv = [split_lives(cv) for cv in d] - return metric_J_from_cv(lives_cv, window_size, step) - - -def cv_regression_metrics_single_model( - results: List[PredictionResult], threshold: float = np.inf -): - errors = {"MAE": [], "MAE SW": [], "MSE": [], "MSE SW": [], "MAPE": []} - for result in results: - y_mask = np.where(result.true_RUL <= threshold)[0] - y_true = np.squeeze(result.true_RUL[y_mask]) - y_pred = np.squeeze(result.predicted_RUL[y_mask]) - mask = np.isfinite(y_pred) - y_pred = y_pred[mask] - y_true = y_true[mask] - - if len(np.unique(y_pred)) == 1: - continue - - sw = compute_sample_weight( - "relative", - y_true, - y_pred, - ) - try: - MAE_SW = mae( - y_true, - y_pred, - sample_weight=sw, - ) - except: - MAE_SW = np.nan - try: - MAE = mae(y_true, y_pred) - except: - MAE = np.nan - - try: - MSE_SW = mse( - y_true, - y_pred, - sample_weight=sw, - ) - except: - MSE_SW = np.nan - - try: - MSE = mse(y_true, y_pred) - except: - MSE = np.nan - - try: - MAPE = mape(y_true, y_pred) - except: - MAPE = np.nan - - lives = split_lives(result) - errors["MAE"].append(MAE) - errors["MAE SW"].append(MAE_SW) - errors["MSE"].append(MSE) - errors["MSE SW"].append(MSE_SW) - errors["MAPE"].append(MAPE) - - errors1 = {} - for k in errors.keys(): - errors1[k] = ufloat( - np.round(np.mean(errors[k]), 2), np.round(np.std(errors[k]), 2) - ) - return errors1 - - -def cv_regression_metrics( - results_dict: Dict[str, List[PredictionResult]], threshold: float = np.inf -) -> dict: - """ - Compute regression metrics for each model - - Parameters: -<<<<<<< HEAD - - data: Dictionary with the model predictions. - -======= - data: Dictionary with the model predictions. ->>>>>>> 7862b3c56dbc36f72b3c7f87d9b39e1ae78b4ddc - threshold: Compute metrics errors only in RUL values less than the threshold - - Returns: - A dictionary with the following structure: - d: { ['Model]: - { - 'MAE': { - 'mean': - 'std': - }, - 'MAE SW': { - 'mean': - 'std': - }, - 'MSE': { - 'mean': - 'std': - }, - } - ] - -<<<<<<< HEAD - - d: { ['Model]: - { - 'MAE': { - 'mean': - 'std': - }, - 'MAE SW': { - 'mean': - 'std': - }, - 'MSE': { - 'mean': - 'std': - }, - } - ] - -======= ->>>>>>> 7862b3c56dbc36f72b3c7f87d9b39e1ae78b4ddc - """ - out = {} - for model_name in results_dict.keys(): - out[model_name] = cv_regression_metrics_single_model( - results_dict[model_name], threshold - ) - return out diff --git a/ceruleo/results/utils.py b/ceruleo/results/utils.py new file mode 100644 index 00000000..554f4c9d --- /dev/null +++ b/ceruleo/results/utils.py @@ -0,0 +1,56 @@ + +from typing import List, Union, Literal +from pydantic import BaseModel, Field +import numpy as np +from enum import Enum + +class SampleWeightType(str, Enum): + """Sample weight type""" + + relative = "relative" + one = "one" + + +def compute_sample_weight(sample_weight:SampleWeightType, + y_true:np.ndarray, + y_pred:np.ndarray, c: float = 0.9) -> Union[np.ndarray, Literal[1]]: + """Compute the sample weight for the regression metrics + + Args: + sample_weight (SampleWeightType): Type of sample weight + y_true (np.ndarray): RUL True values + y_pred (np.ndarray): RUL Predicted values + c (float, optional): . Defaults to 0.9. + + Returns: + Union[np.ndarray, Literal[1]]: _description_ + """ + + if sample_weight == "relative": + return np.abs(y_true - y_pred) / (np.clip(y_true, c, np.inf)) + else: + return 1 + + +def split_lives_indices(y_true: np.ndarray) -> List[range]: + """Obtain a list of indices for each life + + Args: + y_true: True vector with the RUL + + Returns: + List[range]: A list with the indices belonging to each life + """ + assert len(y_true) >= 2 + lives_indices = ( + [0] + + (np.where(np.diff(np.squeeze(y_true)) > 0)[0] + 1).tolist() + + [len(y_true)] + ) + indices = [] + for i in range(len(lives_indices) - 1): + r = range(lives_indices[i], lives_indices[i + 1]) + if len(r) <= 1: + continue + indices.append(r) + return indices \ No newline at end of file diff --git a/tests/test_results.py b/tests/test_results.py index 3272603e..713e0a4b 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -1,8 +1,17 @@ import numpy as np -from ceruleo.results.results import split_lives_indices +from ceruleo.results.results import MetricsResult, split_lives_indices import pytest class TestResults: + + def test_MetricResult(self): + qq = MetricsResult(mae=2.0, mse=2.0) + + def test_split_lives(self): + q = split_lives_indices(np.array([10, 9, 8, 3, 1, 0, 15, 12, 3, 0])) + assert len(q) == 2 + assert [range(0, 6), range(6, 10)] == q + def test_1(self): v1 = np.linspace(25, 0, 50)