Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline recursive predict optimize #125

Merged
merged 1 commit into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 96 additions & 17 deletions paddlets/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import pandas as pd
import pickle

from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Union
from paddlets.models.base import Trainable
from paddlets.datasets.tsdataset import TSDataset, TimeSeries
from paddlets.logger import Logger, raise_if_not, raise_if, raise_log
from paddlets.logger.logger import log_decorator
from paddlets.models.model_loader import load as paddlets_model_load
from paddlets.utils.utils import get_tsdataset_max_len, split_dataset

logger = Logger(__name__)

Expand Down Expand Up @@ -96,25 +97,59 @@ def fit(
self._fitted = True
return self

def transform(self, tsdataset: TSDataset, inplace: bool = False) -> TSDataset:
def transform(self,
tsdataset: TSDataset,
inplace: bool = False,
cache_transform_steps: bool = False,
previous_caches: List[TSDataset] = None) -> Union[TSDataset, Tuple[TSDataset, List[TSDataset]]]:
"""
Transform the `TSDataset` using the fitted transformers in the pipeline.

Args:
tsdataset(TSDataset): Data to be transformed.
inplace(bool): Set to True to perform inplace transform and avoid a data copy. Default is False.
cache_transform_steps: Cache each transform step's transorm result into a list.
previous_caches : previous transform results cache

Returns:
TSDataset: Transformed results.
Tuple[TSDataset,Tuple[List[TSDataset],TSDataset]]: Return transformed results by default, Return Both transformed results
and each transform step's caches if set cache_transform_steps = True.
"""
self._check_fitted()
tsdataset_transformed = tsdataset
if not inplace:
tsdataset_transformed = tsdataset.copy()

# Transform
for transform in self._transform_list:
tsdataset_transformed = transform.transform(tsdataset_transformed)
return tsdataset_transformed
tansform_list_length = len(self._transform_list)
# Init transform copys with same length as tansform list, fill with None
transform_caches = [None] * tansform_list_length
# the first transformer's cache is the origin data
if cache_transform_steps and self._transform_list[0].need_previous_data:
transform_caches[0] = tsdataset_transformed

for i in range(tansform_list_length):
transformed_data_len = get_tsdataset_max_len(tsdataset_transformed)
data_pre = previous_caches[i] if previous_caches else None
transformer = self._transform_list[i]

if data_pre:
tsdataset_transformed = TSDataset.concat([data_pre, tsdataset_transformed])
tsdataset_transformed = transformer.transform_n_rows(tsdataset_transformed, transformed_data_len)

# caches
if cache_transform_steps:
next_transformer_index = i + 1
last_transformer_index = tansform_list_length - 1
# final transfomer do not has next transformer, break
if i == last_transformer_index:
break
# next transformer's cache is this transformer's results
if self._transform_list[next_transformer_index].need_previous_data:
transform_caches[next_transformer_index] = tsdataset_transformed

res = tsdataset_transformed
return (res, transform_caches) if cache_transform_steps else res

def inverse_transform(self, tsdataset: TSDataset, inplace: bool=False) -> TSDataset:
"""
Expand All @@ -140,7 +175,7 @@ def inverse_transform(self, tsdataset: TSDataset, inplace: bool=False) -> TSData
tmp_ts = transform.inverse_transform(tsdataset_transformed)
tsdataset_transformed = tmp_ts
except NotImplementedError:
logger.info("%s not implement inverse_transform, continue" % (transform.__class__.__name__))
logger.debug("%s not implement inverse_transform, continue" % (transform.__class__.__name__))
continue
except Exception as e:
raise_log(RuntimeError("error occurred while inverse_transform, error: %s" % (str(e))))
Expand Down Expand Up @@ -311,25 +346,64 @@ def _recursive_predict(
freq=tsdataset_copy.get_observed_cov().time_index.freq),
fill_value=fill_value
)
target_length = len(tsdataset_copy.target)

# feature process on pre data
if tsdataset_copy.known_cov:
pre_data, _ = split_dataset(tsdataset_copy, target_length + self._model._out_chunk_len)
else:
pre_data = tsdataset_copy
data_pre_transformed, data_pre_transformed_caches = self.transform(tsdataset=pre_data,
cache_transform_steps=True)

results = []
for _ in range(recursive_rounds):
tsdataset_tmp = tsdataset_copy.copy()
# Predict
output = None

# recursive predict start
for i in range(recursive_rounds):

# predict
if need_proba == True:
output = self.predict_proba(tsdataset_tmp)
predictions = self._model.predict_proba(data_pre_transformed)
else:
output = self.predict(tsdataset_tmp)
# Update data using predicted value
tsdataset_copy = TSDataset.concat([tsdataset_copy, output])
results.append(output)
predictions = self._model.predict(data_pre_transformed)
predictions = self.inverse_transform(predictions)
results.append(predictions)
#break in last round
if i == recursive_rounds -1:
break

# predict concat to origindata
tsdataset_copy = TSDataset.concat([tsdataset_copy, predictions], keep="last")
target_length = target_length + self._model._out_chunk_len

# split new predict chunk
_, new_chunk = tsdataset_copy.split(target_length - self._model._out_chunk_len)
if tsdataset_copy.known_cov:
new_chunk, _ = split_dataset(new_chunk, 2 * self._model._out_chunk_len)

# transform one chunk
chunk_transformed, chunk_transformed_caches = self.transform(new_chunk,
cache_transform_steps=True,
previous_caches=data_pre_transformed_caches,
inplace=False)

# concate transform results
data_pre_transformed = TSDataset.concat([data_pre_transformed, chunk_transformed], keep="last")

# concat transform caches
for i in range(len(data_pre_transformed_caches)):
if data_pre_transformed_caches[i]:
data_pre_transformed_caches[i] = TSDataset.concat(
[data_pre_transformed_caches[i], chunk_transformed_caches[i]])

# Concat results
result = TSDataset.concat(results)
# Resize result
result.set_target(
TimeSeries(result.get_target().data[0: predict_length], result.freq)
)
return result


def save(self, path: str, pipeline_file_name: str = "pipeline-partial.pkl", model_file_name: str = "paddlets_model"):
"""
Expand Down Expand Up @@ -429,4 +503,9 @@ def _check_recursive_predict_valid(self, predict_length: int, need_proba: bool =
raise_if(self._model._skip_chunk_len != 0, f"recursive_predict not supported when \
_skip_chunk_len!=0, got {self._model._skip_chunk_len}.")
raise_if(predict_length <= 0, f"predict_length must be > \
0, got {predict_length}.")
0, got {predict_length}.")

@property
def steps(self):
return self._steps

151 changes: 146 additions & 5 deletions paddlets/tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# !/usr/bin/env python3
i# !/usr/bin/env python3
# -*- coding:utf-8 -*-
import os
import unittest
import shutil

import pandas as pd
import numpy as np
from unittest import TestCase

from paddlets.models.forecasting import MLPRegressor
from paddlets.transform import KSigma, TimeFeatureGenerator, StandardScaler
from paddlets.transform import KSigma, TimeFeatureGenerator, StandardScaler, StatsTransform, Fill
from paddlets.datasets.tsdataset import TimeSeries, TSDataset
from paddlets.pipeline.pipeline import Pipeline
from paddlets.utils import get_uuid
Expand Down Expand Up @@ -205,14 +206,105 @@ def test_recursive_predict(self):
static_cov = {"f": 1, "g": 2}
tsdataset = TSDataset(target, observed_cov, known_cov, static_cov)
transform_params = {"cols": ['b1'], "k": 0.5}
transform_params_1 = {"cols": ['c1'], "k": 0.7}
transform_params_1 = {"cols": ['c1']}
nn_params = {
'in_chunk_len': 7 * 96 + 20 * 4,
'out_chunk_len': 96,
'skip_chunk_len': 0,
'eval_metrics': ["mse", "mae"]
}
pipe = Pipeline([(KSigma, transform_params), (TimeFeatureGenerator, {}), (KSigma, transform_params_1), \

#case1
pipe = Pipeline([(KSigma, transform_params), (TimeFeatureGenerator, {}), (StatsTransform, transform_params_1), \
(MLPRegressor, nn_params)])
pipe.fit(tsdataset, tsdataset)
res = pipe.recursive_predict(tsdataset, 201)
self.assertIsInstance(res, TSDataset)
self.assertEqual(res.get_target().to_dataframe().shape, (201, 1))
# test recursive predict proba bad case
with self.assertRaises(ValueError):
res = pipe.recursive_predict_proba(tsdataset, 202)
# recursive predict bad case
# unsupported index type
tsdataset.get_target().reindex(
pd.CategoricalIndex(["a", "b", "c", "a", "b", "c"]),
fill_value=np.nan
)
with self.assertRaises(Exception):
res = pipe.recursive_predict(tsdataset, 201)

#case2
np.random.seed(2022)

target = TimeSeries.load_from_dataframe(
pd.Series(np.random.randn(2000).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
name="a"
))

observed_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2000, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
columns=["b", "c"]
))
known_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2500, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2500, freq="15T"),
columns=["b1", "c1"]
))
static_cov = {"f": 1, "g": 2}
tsdataset = TSDataset(target, observed_cov, known_cov, static_cov)
transform_params = {"cols": ['b1'], "k": 0.5}
nn_params = {
'in_chunk_len': 7 * 96 + 20 * 4,
'out_chunk_len': 96,
'skip_chunk_len': 0,
'eval_metrics': ["mse", "mae"]
}

transform_params_1 = {"cols": ['c1'], "end":10}
pipe = Pipeline([(StatsTransform, transform_params_1),(KSigma, transform_params), (TimeFeatureGenerator, {}), (StatsTransform, transform_params_1), \
(MLPRegressor, nn_params)])
pipe.fit(tsdataset, tsdataset)
res = pipe.recursive_predict(tsdataset, 201)
self.assertIsInstance(res, TSDataset)
self.assertEqual(res.get_target().to_dataframe().shape, (201, 1))


#case3
np.random.seed(2022)

target = TimeSeries.load_from_dataframe(
pd.Series(np.random.randn(2000).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
name="a"
))

observed_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2000, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
columns=["b", "c"]
))
known_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2500, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2500, freq="15T"),
columns=["b1", "c1"]
))
static_cov = {"f": 1, "g": 2}
tsdataset = TSDataset(target, observed_cov, known_cov, static_cov)
transform_params = {"cols": ['b1'], "k": 0.5}
transform_params_1 = {"cols": ['c1']}
nn_params = {
'in_chunk_len': 7 * 96 + 20 * 4,
'out_chunk_len': 96,
'skip_chunk_len': 0,
'eval_metrics': ["mse", "mae"]
}
pipe = Pipeline([(StatsTransform, transform_params_1),\
(MLPRegressor, nn_params)])
pipe.fit(tsdataset, tsdataset)
res = pipe.recursive_predict(tsdataset, 201)
Expand All @@ -230,6 +322,54 @@ def test_recursive_predict(self):
with self.assertRaises(Exception):
res = pipe.recursive_predict(tsdataset, 201)

#case4
np.random.seed(2022)

target = TimeSeries.load_from_dataframe(
pd.Series(np.random.randn(2000).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
name="a"
))

observed_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2000, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
columns=["b", "c"]
))
known_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2500, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2500, freq="15T"),
columns=["b1", "c1"]
))
static_cov = {"f": 1, "g": 2}
tsdataset = TSDataset(target, observed_cov, known_cov, static_cov)
transform_params = {"cols": ['b1'], "k": 0.5}
transform_params_1 = {"cols": ['c1']}
nn_params = {
'in_chunk_len': 7 * 96 + 20 * 4,
'out_chunk_len': 96,
'skip_chunk_len': 0,
'eval_metrics': ["mse", "mae"]
}
pipe = Pipeline([(StatsTransform, transform_params_1),(KSigma, transform_params), (TimeFeatureGenerator, {}), (Fill, {"cols": ['c1']}), \
(MLPRegressor, nn_params)])
pipe.fit(tsdataset, tsdataset)
res = pipe.recursive_predict(tsdataset, 201)
self.assertIsInstance(res, TSDataset)
self.assertEqual(res.get_target().to_dataframe().shape, (201, 1))
# test recursive predict proba bad case
with self.assertRaises(ValueError):
res = pipe.recursive_predict_proba(tsdataset, 202)
# recursive predict bad case
# unsupported index type
tsdataset.get_target().reindex(
pd.CategoricalIndex(["a", "b", "c", "a", "b", "c"]),
fill_value=np.nan
)
with self.assertRaises(Exception):
res = pipe.recursive_predict(tsdataset, 201)
def test_predict_proba(self):
"""
unittest function
Expand Down Expand Up @@ -333,4 +473,5 @@ def test_save_and_load(self):


if __name__ == "__main__":
unittest.main()
unittest.main()

Loading