diff --git a/paddlets/pipeline/pipeline.py b/paddlets/pipeline/pipeline.py index f595b1af..34c9a00b 100644 --- a/paddlets/pipeline/pipeline.py +++ b/paddlets/pipeline/pipeline.py @@ -7,12 +7,13 @@ import pandas as pd import pickle -from typing import List, Optional, Tuple +from typing import List, Optional, Tuple, Union from paddlets.models.base import Trainable from paddlets.datasets.tsdataset import TSDataset, TimeSeries from paddlets.logger import Logger, raise_if_not, raise_if, raise_log from paddlets.logger.logger import log_decorator from paddlets.models.model_loader import load as paddlets_model_load +from paddlets.utils.utils import get_tsdataset_max_len, split_dataset logger = Logger(__name__) @@ -96,25 +97,59 @@ def fit( self._fitted = True return self - def transform(self, tsdataset: TSDataset, inplace: bool = False) -> TSDataset: + def transform(self, + tsdataset: TSDataset, + inplace: bool = False, + cache_transform_steps: bool = False, + previous_caches: List[TSDataset] = None) -> Union[TSDataset, Tuple[TSDataset, List[TSDataset]]]: """ Transform the `TSDataset` using the fitted transformers in the pipeline. Args: tsdataset(TSDataset): Data to be transformed. inplace(bool): Set to True to perform inplace transform and avoid a data copy. Default is False. + cache_transform_steps: Cache each transform step's transorm result into a list. + previous_caches : previous transform results cache Returns: - TSDataset: Transformed results. + Tuple[TSDataset,Tuple[List[TSDataset],TSDataset]]: Return transformed results by default, Return Both transformed results + and each transform step's caches if set cache_transform_steps = True. """ self._check_fitted() tsdataset_transformed = tsdataset if not inplace: tsdataset_transformed = tsdataset.copy() + # Transform - for transform in self._transform_list: - tsdataset_transformed = transform.transform(tsdataset_transformed) - return tsdataset_transformed + tansform_list_length = len(self._transform_list) + # Init transform copys with same length as tansform list, fill with None + transform_caches = [None] * tansform_list_length + # the first transformer's cache is the origin data + if cache_transform_steps and self._transform_list[0].need_previous_data: + transform_caches[0] = tsdataset_transformed + + for i in range(tansform_list_length): + transformed_data_len = get_tsdataset_max_len(tsdataset_transformed) + data_pre = previous_caches[i] if previous_caches else None + transformer = self._transform_list[i] + + if data_pre: + tsdataset_transformed = TSDataset.concat([data_pre, tsdataset_transformed]) + tsdataset_transformed = transformer.transform_n_rows(tsdataset_transformed, transformed_data_len) + + # caches + if cache_transform_steps: + next_transformer_index = i + 1 + last_transformer_index = tansform_list_length - 1 + # final transfomer do not has next transformer, break + if i == last_transformer_index: + break + # next transformer's cache is this transformer's results + if self._transform_list[next_transformer_index].need_previous_data: + transform_caches[next_transformer_index] = tsdataset_transformed + + res = tsdataset_transformed + return (res, transform_caches) if cache_transform_steps else res def inverse_transform(self, tsdataset: TSDataset, inplace: bool=False) -> TSDataset: """ @@ -140,7 +175,7 @@ def inverse_transform(self, tsdataset: TSDataset, inplace: bool=False) -> TSData tmp_ts = transform.inverse_transform(tsdataset_transformed) tsdataset_transformed = tmp_ts except NotImplementedError: - logger.info("%s not implement inverse_transform, continue" % (transform.__class__.__name__)) + logger.debug("%s not implement inverse_transform, continue" % (transform.__class__.__name__)) continue except Exception as e: raise_log(RuntimeError("error occurred while inverse_transform, error: %s" % (str(e)))) @@ -311,18 +346,56 @@ def _recursive_predict( freq=tsdataset_copy.get_observed_cov().time_index.freq), fill_value=fill_value ) + target_length = len(tsdataset_copy.target) + + # feature process on pre data + if tsdataset_copy.known_cov: + pre_data, _ = split_dataset(tsdataset_copy, target_length + self._model._out_chunk_len) + else: + pre_data = tsdataset_copy + data_pre_transformed, data_pre_transformed_caches = self.transform(tsdataset=pre_data, + cache_transform_steps=True) + results = [] - for _ in range(recursive_rounds): - tsdataset_tmp = tsdataset_copy.copy() - # Predict - output = None + + # recursive predict start + for i in range(recursive_rounds): + + # predict if need_proba == True: - output = self.predict_proba(tsdataset_tmp) + predictions = self._model.predict_proba(data_pre_transformed) else: - output = self.predict(tsdataset_tmp) - # Update data using predicted value - tsdataset_copy = TSDataset.concat([tsdataset_copy, output]) - results.append(output) + predictions = self._model.predict(data_pre_transformed) + predictions = self.inverse_transform(predictions) + results.append(predictions) + #break in last round + if i == recursive_rounds -1: + break + + # predict concat to origindata + tsdataset_copy = TSDataset.concat([tsdataset_copy, predictions], keep="last") + target_length = target_length + self._model._out_chunk_len + + # split new predict chunk + _, new_chunk = tsdataset_copy.split(target_length - self._model._out_chunk_len) + if tsdataset_copy.known_cov: + new_chunk, _ = split_dataset(new_chunk, 2 * self._model._out_chunk_len) + + # transform one chunk + chunk_transformed, chunk_transformed_caches = self.transform(new_chunk, + cache_transform_steps=True, + previous_caches=data_pre_transformed_caches, + inplace=False) + + # concate transform results + data_pre_transformed = TSDataset.concat([data_pre_transformed, chunk_transformed], keep="last") + + # concat transform caches + for i in range(len(data_pre_transformed_caches)): + if data_pre_transformed_caches[i]: + data_pre_transformed_caches[i] = TSDataset.concat( + [data_pre_transformed_caches[i], chunk_transformed_caches[i]]) + # Concat results result = TSDataset.concat(results) # Resize result @@ -330,6 +403,7 @@ def _recursive_predict( TimeSeries(result.get_target().data[0: predict_length], result.freq) ) return result + def save(self, path: str, pipeline_file_name: str = "pipeline-partial.pkl", model_file_name: str = "paddlets_model"): """ @@ -429,4 +503,9 @@ def _check_recursive_predict_valid(self, predict_length: int, need_proba: bool = raise_if(self._model._skip_chunk_len != 0, f"recursive_predict not supported when \ _skip_chunk_len!=0, got {self._model._skip_chunk_len}.") raise_if(predict_length <= 0, f"predict_length must be > \ - 0, got {predict_length}.") \ No newline at end of file + 0, got {predict_length}.") + + @property + def steps(self): + return self._steps + diff --git a/paddlets/tests/pipeline/test_pipeline.py b/paddlets/tests/pipeline/test_pipeline.py index e8c0ec1c..79c4efb8 100644 --- a/paddlets/tests/pipeline/test_pipeline.py +++ b/paddlets/tests/pipeline/test_pipeline.py @@ -1,14 +1,15 @@ -# !/usr/bin/env python3 +i# !/usr/bin/env python3 # -*- coding:utf-8 -*- import os import unittest +import shutil import pandas as pd import numpy as np from unittest import TestCase from paddlets.models.forecasting import MLPRegressor -from paddlets.transform import KSigma, TimeFeatureGenerator, StandardScaler +from paddlets.transform import KSigma, TimeFeatureGenerator, StandardScaler, StatsTransform, Fill from paddlets.datasets.tsdataset import TimeSeries, TSDataset from paddlets.pipeline.pipeline import Pipeline from paddlets.utils import get_uuid @@ -205,14 +206,105 @@ def test_recursive_predict(self): static_cov = {"f": 1, "g": 2} tsdataset = TSDataset(target, observed_cov, known_cov, static_cov) transform_params = {"cols": ['b1'], "k": 0.5} - transform_params_1 = {"cols": ['c1'], "k": 0.7} + transform_params_1 = {"cols": ['c1']} nn_params = { 'in_chunk_len': 7 * 96 + 20 * 4, 'out_chunk_len': 96, 'skip_chunk_len': 0, 'eval_metrics': ["mse", "mae"] } - pipe = Pipeline([(KSigma, transform_params), (TimeFeatureGenerator, {}), (KSigma, transform_params_1), \ + + #case1 + pipe = Pipeline([(KSigma, transform_params), (TimeFeatureGenerator, {}), (StatsTransform, transform_params_1), \ + (MLPRegressor, nn_params)]) + pipe.fit(tsdataset, tsdataset) + res = pipe.recursive_predict(tsdataset, 201) + self.assertIsInstance(res, TSDataset) + self.assertEqual(res.get_target().to_dataframe().shape, (201, 1)) + # test recursive predict proba bad case + with self.assertRaises(ValueError): + res = pipe.recursive_predict_proba(tsdataset, 202) + # recursive predict bad case + # unsupported index type + tsdataset.get_target().reindex( + pd.CategoricalIndex(["a", "b", "c", "a", "b", "c"]), + fill_value=np.nan + ) + with self.assertRaises(Exception): + res = pipe.recursive_predict(tsdataset, 201) + + #case2 + np.random.seed(2022) + + target = TimeSeries.load_from_dataframe( + pd.Series(np.random.randn(2000).astype(np.float32), + index=pd.date_range("2022-01-01", periods=2000, freq="15T"), + name="a" + )) + + observed_cov = TimeSeries.load_from_dataframe( + pd.DataFrame( + np.random.randn(2000, 2).astype(np.float32), + index=pd.date_range("2022-01-01", periods=2000, freq="15T"), + columns=["b", "c"] + )) + known_cov = TimeSeries.load_from_dataframe( + pd.DataFrame( + np.random.randn(2500, 2).astype(np.float32), + index=pd.date_range("2022-01-01", periods=2500, freq="15T"), + columns=["b1", "c1"] + )) + static_cov = {"f": 1, "g": 2} + tsdataset = TSDataset(target, observed_cov, known_cov, static_cov) + transform_params = {"cols": ['b1'], "k": 0.5} + nn_params = { + 'in_chunk_len': 7 * 96 + 20 * 4, + 'out_chunk_len': 96, + 'skip_chunk_len': 0, + 'eval_metrics': ["mse", "mae"] + } + + transform_params_1 = {"cols": ['c1'], "end":10} + pipe = Pipeline([(StatsTransform, transform_params_1),(KSigma, transform_params), (TimeFeatureGenerator, {}), (StatsTransform, transform_params_1), \ + (MLPRegressor, nn_params)]) + pipe.fit(tsdataset, tsdataset) + res = pipe.recursive_predict(tsdataset, 201) + self.assertIsInstance(res, TSDataset) + self.assertEqual(res.get_target().to_dataframe().shape, (201, 1)) + + + #case3 + np.random.seed(2022) + + target = TimeSeries.load_from_dataframe( + pd.Series(np.random.randn(2000).astype(np.float32), + index=pd.date_range("2022-01-01", periods=2000, freq="15T"), + name="a" + )) + + observed_cov = TimeSeries.load_from_dataframe( + pd.DataFrame( + np.random.randn(2000, 2).astype(np.float32), + index=pd.date_range("2022-01-01", periods=2000, freq="15T"), + columns=["b", "c"] + )) + known_cov = TimeSeries.load_from_dataframe( + pd.DataFrame( + np.random.randn(2500, 2).astype(np.float32), + index=pd.date_range("2022-01-01", periods=2500, freq="15T"), + columns=["b1", "c1"] + )) + static_cov = {"f": 1, "g": 2} + tsdataset = TSDataset(target, observed_cov, known_cov, static_cov) + transform_params = {"cols": ['b1'], "k": 0.5} + transform_params_1 = {"cols": ['c1']} + nn_params = { + 'in_chunk_len': 7 * 96 + 20 * 4, + 'out_chunk_len': 96, + 'skip_chunk_len': 0, + 'eval_metrics': ["mse", "mae"] + } + pipe = Pipeline([(StatsTransform, transform_params_1),\ (MLPRegressor, nn_params)]) pipe.fit(tsdataset, tsdataset) res = pipe.recursive_predict(tsdataset, 201) @@ -230,6 +322,54 @@ def test_recursive_predict(self): with self.assertRaises(Exception): res = pipe.recursive_predict(tsdataset, 201) + #case4 + np.random.seed(2022) + + target = TimeSeries.load_from_dataframe( + pd.Series(np.random.randn(2000).astype(np.float32), + index=pd.date_range("2022-01-01", periods=2000, freq="15T"), + name="a" + )) + + observed_cov = TimeSeries.load_from_dataframe( + pd.DataFrame( + np.random.randn(2000, 2).astype(np.float32), + index=pd.date_range("2022-01-01", periods=2000, freq="15T"), + columns=["b", "c"] + )) + known_cov = TimeSeries.load_from_dataframe( + pd.DataFrame( + np.random.randn(2500, 2).astype(np.float32), + index=pd.date_range("2022-01-01", periods=2500, freq="15T"), + columns=["b1", "c1"] + )) + static_cov = {"f": 1, "g": 2} + tsdataset = TSDataset(target, observed_cov, known_cov, static_cov) + transform_params = {"cols": ['b1'], "k": 0.5} + transform_params_1 = {"cols": ['c1']} + nn_params = { + 'in_chunk_len': 7 * 96 + 20 * 4, + 'out_chunk_len': 96, + 'skip_chunk_len': 0, + 'eval_metrics': ["mse", "mae"] + } + pipe = Pipeline([(StatsTransform, transform_params_1),(KSigma, transform_params), (TimeFeatureGenerator, {}), (Fill, {"cols": ['c1']}), \ + (MLPRegressor, nn_params)]) + pipe.fit(tsdataset, tsdataset) + res = pipe.recursive_predict(tsdataset, 201) + self.assertIsInstance(res, TSDataset) + self.assertEqual(res.get_target().to_dataframe().shape, (201, 1)) + # test recursive predict proba bad case + with self.assertRaises(ValueError): + res = pipe.recursive_predict_proba(tsdataset, 202) + # recursive predict bad case + # unsupported index type + tsdataset.get_target().reindex( + pd.CategoricalIndex(["a", "b", "c", "a", "b", "c"]), + fill_value=np.nan + ) + with self.assertRaises(Exception): + res = pipe.recursive_predict(tsdataset, 201) def test_predict_proba(self): """ unittest function @@ -333,4 +473,5 @@ def test_save_and_load(self): if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() + diff --git a/paddlets/utils/utils.py b/paddlets/utils/utils.py index 1008fc1f..a351b23d 100644 --- a/paddlets/utils/utils.py +++ b/paddlets/utils/utils.py @@ -1,16 +1,16 @@ #!/usr/bin/env python3 # -*- coding: UTF-8 -*- + +from numbers import Integral import uuid import hashlib from inspect import isclass import pandas as pd -from paddlets.logger.logger import raise_log from paddlets.models.base import Trainable -from paddlets.pipeline import Pipeline -from paddlets.models.forecasting.dl.paddle_base import PaddleBaseModel -from paddlets.logger import raise_if_not +from paddlets.logger import raise_if_not, raise_if, raise_log +from paddlets.datasets.tsdataset import TSDataset def check_model_fitted(model: Trainable, msg: str = None): @@ -32,7 +32,8 @@ def check_model_fitted(model: Trainable, msg: str = None): Raise: ValueError """ - + from paddlets.pipeline import Pipeline + from paddlets.models.forecasting.dl.paddle_base import PaddleBaseModel #不需要fit的模型列表 MODEL_NEED_NO_FIT = ["ArimaModel"] if model.__class__.__name__ in MODEL_NEED_NO_FIT: @@ -64,7 +65,7 @@ def get_uuid(prefix: str = "", suffix: str = ""): Args: prefix(str, optional): The prefix of the returned string. suffix(str, optional): The suffix of the returned string. - + Returns: str: String of 16 characters. """ @@ -78,6 +79,7 @@ def get_uuid(prefix: str = "", suffix: str = ""): res = prefix + suffix if suffix is not None else prefix return res + def check_train_valid_continuity(train_data: TSDataset, valid_data: TSDataset)-> bool: """ Check if train and test TSDataset are continous @@ -104,3 +106,119 @@ def check_train_valid_continuity(train_data: TSDataset, valid_data: TSDataset)-> raise_log("Unsupport data index format") return continuious + + +def split_dataset(dataset: TSDataset, split_point: int) -> TSDataset: + """ + Split dataset (accroding to the max length) + + Args: + dataset(TSDataset): dataset to be splited. + split_point(int): split point. + + Return: + TSDataset + + """ + target_index = None + observed_index = None + known_index = None + index_list = [] + + if dataset.target: + target_index = dataset.target.data.index + index_list.append(target_index) + if dataset.known_cov: + known_index = dataset.known_cov.data.index + index_list.append(known_index) + if dataset.observed_cov: + observed_index = dataset.observed_cov.data.index + index_list.append(observed_index) + + #sort to avoid wrong positions index + index_list.sort(key=lambda x: x[0]) + + all_index = pd.concat([x.to_series() for x in index_list]).index.drop_duplicates() + max_len = len(all_index) + split_index = all_index[split_point-1] + + raise_if(split_point >= max_len, "split point should smaller than dataset length") + raise_if(split_point <= 0, "split point should > 0") + raise_if_not(isinstance(split_point, Integral), + f"split point should be Integral type, instead of {type(split_point)}") + + target_pre = None + target_after = None + if dataset.target: + + if split_index < target_index[0]: + target_after = dataset.target + elif split_index >= target_index[-1]: + target_pre = dataset.target + elif split_index in target_index: + if isinstance(dataset.target.data.index, pd.RangeIndex): + target_pre, target_after = dataset.target.split(int((split_index - dataset.target.data.index[0]) / dataset.target.data.index.step +1)) + else: + target_pre, target_after = dataset.target.split(split_index) + + known_pre = None + known_after = None + if dataset.known_cov: + if split_index < known_index[0]: + known_after = dataset.known_cov + elif split_index >= known_index[-1]: + known_pre = dataset.known_cov + elif split_index in known_index: + if isinstance(dataset.known_cov.data.index, pd.RangeIndex): + known_pre, known_after = dataset.known_cov.split(int((split_index - dataset.known_cov.data.index[0]) / dataset.known_cov.data.index.step + 1)) + else: + known_pre, known_after = dataset.known_cov.split(split_index) + + observed_pre = None + observed_after = None + if dataset.observed_cov: + if split_index < observed_index[0]: + observed_after = dataset.observed_cov + elif split_index >= observed_index[-1]: + observed_pre = dataset.observed_cov + elif split_index in observed_index: + if isinstance(dataset.observed_cov.data.index, pd.RangeIndex): + observed_pre, observed_after = dataset.observed_cov.split(int(((split_index - dataset.observed_cov.data.index[0])) / dataset.observed_cov.data.index.step + 1)) + else: + observed_pre, observed_after = dataset.observed_cov.split(split_index) + return (TSDataset(target_pre, observed_pre, known_pre, dataset.static_cov), + TSDataset(target_after, observed_after, known_after, dataset.static_cov)) + +def get_tsdataset_max_len(dataset:TSDataset) -> int: + """ + Get dataset max length + + Args: + dataset(TSDataset): dataset use to get length. + + Return: + int + + """ + target_index = None + observed_index = None + known_index = None + index_list = [] + + if dataset.target: + target_index = dataset.target.data.index + index_list.append(target_index) + if dataset.known_cov: + known_index = dataset.known_cov.data.index + index_list.append(known_index) + if dataset.observed_cov: + observed_index = dataset.observed_cov.data.index + index_list.append(observed_index) + + #sort to avoid wrong positions index + index_list.sort(key=lambda x: x[0]) + + all_index = pd.concat([x.to_series() for x in index_list]).index.drop_duplicates() + + return len(all_index) +