Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset builder #32

Merged
merged 17 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ jobs:
python -m pip install wheel setuptools cython
python -m pip install tensorflow
if [ -f requirements.txt ]; then python -m pip install -r requirements.txt; fi
python -m pip install -e $GITHUB_WORKSPACE
python -m pip install -e $GITHUB_WORKSPACE[test]


- name: Test with pytest
run: |
coverage run --source=. -m pytest
coverage report -m


- name: Coveralls
uses: AndreMiras/coveralls-python-action@develop
with:
Expand Down
2 changes: 1 addition & 1 deletion ceruleo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
CACHE_PATH.mkdir(parents=True, exist_ok=True)


__version__ = "2.0.6"
__version__ = "3.0.0"
4 changes: 2 additions & 2 deletions ceruleo/dataset/analysis/correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from typing import List, Optional, Tuple

import pandas as pd
from ceruleo.dataset.ts_dataset import AbstractTimeSeriesDataset
from ceruleo.dataset.ts_dataset import AbstractPDMDataset
from ceruleo.dataset.utils import iterate_over_features


def correlation_analysis(
dataset: AbstractTimeSeriesDataset,
dataset: AbstractPDMDataset,
corr_threshold: float = 0.7,
features: Optional[List[str]] = None,
) -> pd.DataFrame:
Expand Down
6 changes: 3 additions & 3 deletions ceruleo/dataset/analysis/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import numpy as np
import pandas as pd
from ceruleo.dataset.ts_dataset import AbstractTimeSeriesDataset
from ceruleo.dataset.ts_dataset import AbstractPDMDataset
from ceruleo.dataset.utils import iterate_over_features
from scipy.special import kl_div
from scipy.stats import wasserstein_distance
Expand All @@ -31,7 +31,7 @@ def histogram_per_life(
logger.info(f"Error {e} when computing the distribution for feature {feature}")


def compute_bins(ds: AbstractTimeSeriesDataset, feature: str, number_of_bins: int = 15):
def compute_bins(ds: AbstractPDMDataset, feature: str, number_of_bins: int = 15):
min_value = ds.get_features_of_life(0)[feature].min()
max_value = ds.get_features_of_life(0)[feature].max()

Expand All @@ -43,7 +43,7 @@ def compute_bins(ds: AbstractTimeSeriesDataset, feature: str, number_of_bins: in


def features_divergeces(
ds: AbstractTimeSeriesDataset,
ds: AbstractPDMDataset,
number_of_bins: int = 15,
columns: Optional[List[str]] = None,
show_progress: bool = False,
Expand Down
4 changes: 2 additions & 2 deletions ceruleo/dataset/analysis/numerical_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from uncertainties import ufloat

from ceruleo.dataset.transformed import TransformedDataset
from ceruleo.dataset.ts_dataset import AbstractLivesDataset
from ceruleo.dataset.ts_dataset import AbstractPDMDataset
from ceruleo.dataset.utils import iterate_over_features_and_target


Expand Down Expand Up @@ -184,7 +184,7 @@ def merge_analysis(data: dict) -> pd.DataFrame:


def analysis(
dataset: Union[TransformedDataset, AbstractLivesDataset],
dataset: Union[TransformedDataset, AbstractPDMDataset],
*,
show_progress: bool = False,
what_to_compute: List[str] = [],
Expand Down
9 changes: 4 additions & 5 deletions ceruleo/dataset/analysis/sample_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@

import numpy as np
import pandas as pd
from ceruleo.dataset.ts_dataset import AbstractTimeSeriesDataset
from ceruleo.dataset.ts_dataset import AbstractPDMDataset

logger = logging.getLogger(__name__)


def sample_rate(ds: AbstractTimeSeriesDataset, unit: str = "s") -> np.ndarray:
"""
Obtain an array of time difference between two consecutive samples.
def sample_rate(ds: AbstractPDMDataset, unit: str = "s") -> np.ndarray:
"""Obtain an array of time difference between two consecutive samples

If the index it's a timestamp, the time difference will be converted to the provided unit

Expand All @@ -32,7 +31,7 @@ def sample_rate(ds: AbstractTimeSeriesDataset, unit: str = "s") -> np.ndarray:


def sample_rate_summary(
ds: AbstractTimeSeriesDataset, unit: Optional[str] = "s"
ds: AbstractPDMDataset, unit: Optional[str] = "s"
) -> pd.DataFrame:
"""
Obtain the mean, mode and standard deviation of the sample rate of the dataset
Expand Down
Empty file.
180 changes: 180 additions & 0 deletions ceruleo/dataset/builder/builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import logging
import os
from pathlib import Path
from typing import Callable, List, Optional, Tuple, Union


import pandas as pd
from tqdm.auto import tqdm

from ceruleo.dataset.builder.cycles_splitter import (
CyclesSplitter,
FailureDataCycleSplitter,
)
from ceruleo.dataset.builder.output import OutputMode
from ceruleo.dataset.builder.rul_column import RULColumn
from ceruleo.dataset.ts_dataset import PDMDataset

logger = logging.getLogger(__name__)


def load_dataframe(path: Union[str, Path]) -> pd.DataFrame:
if isinstance(path, str):
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"File {path} does not exist")
if path.suffix == ".csv":
return pd.read_csv(path)
if path.suffix == ".parquet":
return pd.read_parquet(path)
if path.suffix == ".xlsx":
return pd.read_excel(path)
raise ValueError(f"Unsupported file format {path.suffix}")


class DatasetBuilder:
splitter: CyclesSplitter
output_mode: OutputMode
rul_column: RULColumn
dataframe_loader: Callable[[Union[str, Path]], pd.DataFrame]
index_column: Optional[str]

def __init__(
self,
dataframe_loader: Callable[[Union[str, Path]], pd.DataFrame] = load_dataframe,
):
"""Initializes the builder."""
self.output_mode = None
self.splitter = None
self.dataframe_loader = dataframe_loader
self.index_column = None
self.rul_column = None

@staticmethod
def one_file_format():
return DatasetBuilder()

def set_splitting_method(self, splitter: CyclesSplitter):
self.splitter = splitter
return self

def set_index_column(self, index_column: str):
self.index_column = index_column
return self

def set_machine_id_feature(self, name: str):
self._machine_type_feature = name
return self

def set_rul_column_method(self, rul_column: RULColumn):
self.rul_column = rul_column
return self

def set_output_mode(self, output_mode: OutputMode):
self.output_mode = output_mode
return self

def _validate(self):
if self.output_mode is None:
raise ValueError("Output mode not set")
if self.splitter is None:
raise ValueError("Splitting method not set")

def build(self, input_path: Path):
self._validate()
self.splitter.split(input_path, self.output_mode)

def prepare_from_data_fault_pairs_files(
self, data_fault_pairs: Union[Tuple[str, str], List[Tuple[str, str]]]
):
if not isinstance(data_fault_pairs, list):
data_fault_pairs = [data_fault_pairs]

if not isinstance(self.splitter, FailureDataCycleSplitter):
raise ValueError(
"This method is only available for FailureDataCycleSplitter"
)

common_path_prefix = os.path.commonprefix(
[data for data, fault in data_fault_pairs]
)

for i, (data, fault) in enumerate(tqdm(data_fault_pairs)):
df_data = self.dataframe_loader(data)
df_faults = self.dataframe_loader(fault)
cycles_in_file = self.splitter.split(df_data, df_faults)
for j, ds in enumerate(cycles_in_file):
cycle_id = f"{i+1}_{j+1}"
self._build_and_store_cycle(
ds,
cycle_id,
metadata={
"Raw Data Filename": str(data.relative_to(common_path_prefix)),
"Raw Fault Filename": str(
fault.relative_to(common_path_prefix)
),
},
)
self.output_mode.finish()

def build_from_data_fault_pairs_files(
self, data_fault_pairs: Union[Tuple[str, str], List[Tuple[str, str]]]
) -> PDMDataset:
self.prepare_from_data_fault_pairs_files(data_fault_pairs)
return self.output_mode.build_dataset(self)

def prepare_from_df(
self, data: Union[pd.DataFrame, List[pd.DataFrame]]
) -> PDMDataset:
if not isinstance(data, list):
data = [data]
self._validate()
for i, data_element in enumerate(data):
for j, ds in enumerate(self.splitter.split(data_element)):
cycle_id = f"{i+1}_{j+1}"
self._build_and_store_cycle(ds, cycle_id)
self.output_mode.finish()

def build_from_df(self, data: Union[pd.DataFrame, List[pd.DataFrame]]):
self.prepare_from_df(data)
return self.output_mode.build_dataset(self)

def prepare_from_data_fault_pair(
self,
data_fault_pairs: Union[
Tuple[pd.DataFrame, pd.DataFrame], List[Tuple[pd.DataFrame, pd.DataFrame]]
],
):
if not isinstance(data_fault_pairs, list):
data_fault_pairs = [data_fault_pairs]

if not isinstance(self.splitter, FailureDataCycleSplitter):
raise ValueError(
"This method is only available for FailureDataCycleSplitter"
)
for i, (data, fault) in enumerate(tqdm(data_fault_pairs)):
cycles_in_file = self.splitter.split(data, fault)
for j, ds in enumerate(cycles_in_file):
cycle_id = f"{i+1}_{j+1}"
self._build_and_store_cycle(
ds,
cycle_id,
)
self.output_mode.finish()

def build_from_data_fault_pair(
self,
data_fault_pairs: Union[
Tuple[pd.DataFrame, pd.DataFrame], List[Tuple[pd.DataFrame, pd.DataFrame]]
],
) -> PDMDataset:
self.prepare_from_data_fault_pair(data_fault_pairs)
return self.output_mode.build_dataset(self)

def _build_and_store_cycle(
self, ds: pd.DataFrame, cycle_id: any, metadata: dict = {}
):
ds["RUL"] = self.rul_column.get(ds)
if self.index_column is not None:
ds.set_index(self.index_column, inplace=True)
self.output_mode.store(f"Cycle_{cycle_id}", ds, metadata)
Loading
Loading