Skip to content

Commit

Permalink
Pipeline recursive predict optimize (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
QGN123 authored Sep 16, 2022
1 parent 82f3901 commit 5899681
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 28 deletions.
113 changes: 96 additions & 17 deletions paddlets/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import pandas as pd
import pickle

from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Union
from paddlets.models.base import Trainable
from paddlets.datasets.tsdataset import TSDataset, TimeSeries
from paddlets.logger import Logger, raise_if_not, raise_if, raise_log
from paddlets.logger.logger import log_decorator
from paddlets.models.model_loader import load as paddlets_model_load
from paddlets.utils.utils import get_tsdataset_max_len, split_dataset

logger = Logger(__name__)

Expand Down Expand Up @@ -96,25 +97,59 @@ def fit(
self._fitted = True
return self

def transform(self, tsdataset: TSDataset, inplace: bool = False) -> TSDataset:
def transform(self,
tsdataset: TSDataset,
inplace: bool = False,
cache_transform_steps: bool = False,
previous_caches: List[TSDataset] = None) -> Union[TSDataset, Tuple[TSDataset, List[TSDataset]]]:
"""
Transform the `TSDataset` using the fitted transformers in the pipeline.
Args:
tsdataset(TSDataset): Data to be transformed.
inplace(bool): Set to True to perform inplace transform and avoid a data copy. Default is False.
cache_transform_steps: Cache each transform step's transorm result into a list.
previous_caches : previous transform results cache
Returns:
TSDataset: Transformed results.
Tuple[TSDataset,Tuple[List[TSDataset],TSDataset]]: Return transformed results by default, Return Both transformed results
and each transform step's caches if set cache_transform_steps = True.
"""
self._check_fitted()
tsdataset_transformed = tsdataset
if not inplace:
tsdataset_transformed = tsdataset.copy()

# Transform
for transform in self._transform_list:
tsdataset_transformed = transform.transform(tsdataset_transformed)
return tsdataset_transformed
tansform_list_length = len(self._transform_list)
# Init transform copys with same length as tansform list, fill with None
transform_caches = [None] * tansform_list_length
# the first transformer's cache is the origin data
if cache_transform_steps and self._transform_list[0].need_previous_data:
transform_caches[0] = tsdataset_transformed

for i in range(tansform_list_length):
transformed_data_len = get_tsdataset_max_len(tsdataset_transformed)
data_pre = previous_caches[i] if previous_caches else None
transformer = self._transform_list[i]

if data_pre:
tsdataset_transformed = TSDataset.concat([data_pre, tsdataset_transformed])
tsdataset_transformed = transformer.transform_n_rows(tsdataset_transformed, transformed_data_len)

# caches
if cache_transform_steps:
next_transformer_index = i + 1
last_transformer_index = tansform_list_length - 1
# final transfomer do not has next transformer, break
if i == last_transformer_index:
break
# next transformer's cache is this transformer's results
if self._transform_list[next_transformer_index].need_previous_data:
transform_caches[next_transformer_index] = tsdataset_transformed

res = tsdataset_transformed
return (res, transform_caches) if cache_transform_steps else res

def inverse_transform(self, tsdataset: TSDataset, inplace: bool=False) -> TSDataset:
"""
Expand All @@ -140,7 +175,7 @@ def inverse_transform(self, tsdataset: TSDataset, inplace: bool=False) -> TSData
tmp_ts = transform.inverse_transform(tsdataset_transformed)
tsdataset_transformed = tmp_ts
except NotImplementedError:
logger.info("%s not implement inverse_transform, continue" % (transform.__class__.__name__))
logger.debug("%s not implement inverse_transform, continue" % (transform.__class__.__name__))
continue
except Exception as e:
raise_log(RuntimeError("error occurred while inverse_transform, error: %s" % (str(e))))
Expand Down Expand Up @@ -311,25 +346,64 @@ def _recursive_predict(
freq=tsdataset_copy.get_observed_cov().time_index.freq),
fill_value=fill_value
)
target_length = len(tsdataset_copy.target)

# feature process on pre data
if tsdataset_copy.known_cov:
pre_data, _ = split_dataset(tsdataset_copy, target_length + self._model._out_chunk_len)
else:
pre_data = tsdataset_copy
data_pre_transformed, data_pre_transformed_caches = self.transform(tsdataset=pre_data,
cache_transform_steps=True)

results = []
for _ in range(recursive_rounds):
tsdataset_tmp = tsdataset_copy.copy()
# Predict
output = None

# recursive predict start
for i in range(recursive_rounds):

# predict
if need_proba == True:
output = self.predict_proba(tsdataset_tmp)
predictions = self._model.predict_proba(data_pre_transformed)
else:
output = self.predict(tsdataset_tmp)
# Update data using predicted value
tsdataset_copy = TSDataset.concat([tsdataset_copy, output])
results.append(output)
predictions = self._model.predict(data_pre_transformed)
predictions = self.inverse_transform(predictions)
results.append(predictions)
#break in last round
if i == recursive_rounds -1:
break

# predict concat to origindata
tsdataset_copy = TSDataset.concat([tsdataset_copy, predictions], keep="last")
target_length = target_length + self._model._out_chunk_len

# split new predict chunk
_, new_chunk = tsdataset_copy.split(target_length - self._model._out_chunk_len)
if tsdataset_copy.known_cov:
new_chunk, _ = split_dataset(new_chunk, 2 * self._model._out_chunk_len)

# transform one chunk
chunk_transformed, chunk_transformed_caches = self.transform(new_chunk,
cache_transform_steps=True,
previous_caches=data_pre_transformed_caches,
inplace=False)

# concate transform results
data_pre_transformed = TSDataset.concat([data_pre_transformed, chunk_transformed], keep="last")

# concat transform caches
for i in range(len(data_pre_transformed_caches)):
if data_pre_transformed_caches[i]:
data_pre_transformed_caches[i] = TSDataset.concat(
[data_pre_transformed_caches[i], chunk_transformed_caches[i]])

# Concat results
result = TSDataset.concat(results)
# Resize result
result.set_target(
TimeSeries(result.get_target().data[0: predict_length], result.freq)
)
return result


def save(self, path: str, pipeline_file_name: str = "pipeline-partial.pkl", model_file_name: str = "paddlets_model"):
"""
Expand Down Expand Up @@ -429,4 +503,9 @@ def _check_recursive_predict_valid(self, predict_length: int, need_proba: bool =
raise_if(self._model._skip_chunk_len != 0, f"recursive_predict not supported when \
_skip_chunk_len!=0, got {self._model._skip_chunk_len}.")
raise_if(predict_length <= 0, f"predict_length must be > \
0, got {predict_length}.")
0, got {predict_length}.")

@property
def steps(self):
return self._steps

151 changes: 146 additions & 5 deletions paddlets/tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# !/usr/bin/env python3
i# !/usr/bin/env python3
# -*- coding:utf-8 -*-
import os
import unittest
import shutil

import pandas as pd
import numpy as np
from unittest import TestCase

from paddlets.models.forecasting import MLPRegressor
from paddlets.transform import KSigma, TimeFeatureGenerator, StandardScaler
from paddlets.transform import KSigma, TimeFeatureGenerator, StandardScaler, StatsTransform, Fill
from paddlets.datasets.tsdataset import TimeSeries, TSDataset
from paddlets.pipeline.pipeline import Pipeline
from paddlets.utils import get_uuid
Expand Down Expand Up @@ -205,14 +206,105 @@ def test_recursive_predict(self):
static_cov = {"f": 1, "g": 2}
tsdataset = TSDataset(target, observed_cov, known_cov, static_cov)
transform_params = {"cols": ['b1'], "k": 0.5}
transform_params_1 = {"cols": ['c1'], "k": 0.7}
transform_params_1 = {"cols": ['c1']}
nn_params = {
'in_chunk_len': 7 * 96 + 20 * 4,
'out_chunk_len': 96,
'skip_chunk_len': 0,
'eval_metrics': ["mse", "mae"]
}
pipe = Pipeline([(KSigma, transform_params), (TimeFeatureGenerator, {}), (KSigma, transform_params_1), \

#case1
pipe = Pipeline([(KSigma, transform_params), (TimeFeatureGenerator, {}), (StatsTransform, transform_params_1), \
(MLPRegressor, nn_params)])
pipe.fit(tsdataset, tsdataset)
res = pipe.recursive_predict(tsdataset, 201)
self.assertIsInstance(res, TSDataset)
self.assertEqual(res.get_target().to_dataframe().shape, (201, 1))
# test recursive predict proba bad case
with self.assertRaises(ValueError):
res = pipe.recursive_predict_proba(tsdataset, 202)
# recursive predict bad case
# unsupported index type
tsdataset.get_target().reindex(
pd.CategoricalIndex(["a", "b", "c", "a", "b", "c"]),
fill_value=np.nan
)
with self.assertRaises(Exception):
res = pipe.recursive_predict(tsdataset, 201)

#case2
np.random.seed(2022)

target = TimeSeries.load_from_dataframe(
pd.Series(np.random.randn(2000).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
name="a"
))

observed_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2000, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
columns=["b", "c"]
))
known_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2500, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2500, freq="15T"),
columns=["b1", "c1"]
))
static_cov = {"f": 1, "g": 2}
tsdataset = TSDataset(target, observed_cov, known_cov, static_cov)
transform_params = {"cols": ['b1'], "k": 0.5}
nn_params = {
'in_chunk_len': 7 * 96 + 20 * 4,
'out_chunk_len': 96,
'skip_chunk_len': 0,
'eval_metrics': ["mse", "mae"]
}

transform_params_1 = {"cols": ['c1'], "end":10}
pipe = Pipeline([(StatsTransform, transform_params_1),(KSigma, transform_params), (TimeFeatureGenerator, {}), (StatsTransform, transform_params_1), \
(MLPRegressor, nn_params)])
pipe.fit(tsdataset, tsdataset)
res = pipe.recursive_predict(tsdataset, 201)
self.assertIsInstance(res, TSDataset)
self.assertEqual(res.get_target().to_dataframe().shape, (201, 1))


#case3
np.random.seed(2022)

target = TimeSeries.load_from_dataframe(
pd.Series(np.random.randn(2000).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
name="a"
))

observed_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2000, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
columns=["b", "c"]
))
known_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2500, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2500, freq="15T"),
columns=["b1", "c1"]
))
static_cov = {"f": 1, "g": 2}
tsdataset = TSDataset(target, observed_cov, known_cov, static_cov)
transform_params = {"cols": ['b1'], "k": 0.5}
transform_params_1 = {"cols": ['c1']}
nn_params = {
'in_chunk_len': 7 * 96 + 20 * 4,
'out_chunk_len': 96,
'skip_chunk_len': 0,
'eval_metrics': ["mse", "mae"]
}
pipe = Pipeline([(StatsTransform, transform_params_1),\
(MLPRegressor, nn_params)])
pipe.fit(tsdataset, tsdataset)
res = pipe.recursive_predict(tsdataset, 201)
Expand All @@ -230,6 +322,54 @@ def test_recursive_predict(self):
with self.assertRaises(Exception):
res = pipe.recursive_predict(tsdataset, 201)

#case4
np.random.seed(2022)

target = TimeSeries.load_from_dataframe(
pd.Series(np.random.randn(2000).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
name="a"
))

observed_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2000, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2000, freq="15T"),
columns=["b", "c"]
))
known_cov = TimeSeries.load_from_dataframe(
pd.DataFrame(
np.random.randn(2500, 2).astype(np.float32),
index=pd.date_range("2022-01-01", periods=2500, freq="15T"),
columns=["b1", "c1"]
))
static_cov = {"f": 1, "g": 2}
tsdataset = TSDataset(target, observed_cov, known_cov, static_cov)
transform_params = {"cols": ['b1'], "k": 0.5}
transform_params_1 = {"cols": ['c1']}
nn_params = {
'in_chunk_len': 7 * 96 + 20 * 4,
'out_chunk_len': 96,
'skip_chunk_len': 0,
'eval_metrics': ["mse", "mae"]
}
pipe = Pipeline([(StatsTransform, transform_params_1),(KSigma, transform_params), (TimeFeatureGenerator, {}), (Fill, {"cols": ['c1']}), \
(MLPRegressor, nn_params)])
pipe.fit(tsdataset, tsdataset)
res = pipe.recursive_predict(tsdataset, 201)
self.assertIsInstance(res, TSDataset)
self.assertEqual(res.get_target().to_dataframe().shape, (201, 1))
# test recursive predict proba bad case
with self.assertRaises(ValueError):
res = pipe.recursive_predict_proba(tsdataset, 202)
# recursive predict bad case
# unsupported index type
tsdataset.get_target().reindex(
pd.CategoricalIndex(["a", "b", "c", "a", "b", "c"]),
fill_value=np.nan
)
with self.assertRaises(Exception):
res = pipe.recursive_predict(tsdataset, 201)
def test_predict_proba(self):
"""
unittest function
Expand Down Expand Up @@ -333,4 +473,5 @@ def test_save_and_load(self):


if __name__ == "__main__":
unittest.main()
unittest.main()

Loading

0 comments on commit 5899681

Please sign in to comment.