Skip to content

Commit

Permalink
Merge pull request #160 from crowdcent/feature/parallel-quantile
Browse files Browse the repository at this point in the history
Feature/parallel quantile
  • Loading branch information
CarloLepelaars authored Jan 10, 2024
2 parents 36f2296 + 2ec7cdd commit f595f25
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 14 deletions.
42 changes: 31 additions & 11 deletions numerblox/preprocessing/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas as pd
import pandas_ta as ta
from tqdm.auto import tqdm
from joblib import Parallel, delayed
from typing import Union, Tuple, List
from multiprocessing.pool import Pool
from sklearn.utils.validation import check_is_fitted
Expand Down Expand Up @@ -252,22 +253,30 @@ class EraQuantileProcessor(BasePreProcessor):
"""
Transform features into quantiles by era.
:param num_quantiles: Number of quantiles to use for quantile transformation.
:param random_state: Random state for QuantileTransformer.
:param random_state: Random state for QuantileTransformer.
:param cpu_cores: Number of CPU cores to use for parallel processing.
"""
def __init__(
self,
num_quantiles: int = 50,
random_state: int = 0
random_state: int = 0,
cpu_cores: int = -1,
):
super().__init__()
self.num_quantiles = num_quantiles
self.random_state = random_state

def _process_feature(self, group_data: pd.Series) -> pd.Series:
quantizer = QuantileTransformer(
self.cpu_cores = cpu_cores
self.quantiler = QuantileTransformer(
n_quantiles=self.num_quantiles, random_state=self.random_state
)
transformed_data = quantizer.fit_transform(group_data.to_frame()).ravel()

def _quantile_transform(self, group_data: pd.Series) -> pd.Series:
"""
Process single feature for a single era.
:param group_data: Data for a single feature and era.
:return: Quantile transformed data.
"""
transformed_data = self.quantiler.fit_transform(group_data.to_frame()).ravel()
return pd.Series(transformed_data, index=group_data.index)

def fit(self, X: Union[np.array, pd.DataFrame], y=None, eras: pd.Series = None):
Expand All @@ -277,15 +286,26 @@ def transform(
self, X: Union[np.array, pd.DataFrame],
eras: pd.Series,
) -> np.array:
"""
Quantile all features by era.
:param X: Array or DataFrame containing features to be quantiled.
:param eras: Series containing era information.
:return: Quantiled features.
"""
X = pd.DataFrame(X)
assert X.shape[0] == eras.shape[0], "Input X and eras must have the same number of rows for quantiling."
self.features = [col for col in X.columns]
print(f"Quantiling for {len(self.features)} features.")
X.loc[:, "era"] = eras
date_groups = X.groupby('era', group_keys=False)
output_df = pd.DataFrame()
for feature in tqdm(self.features):
group_data = date_groups[feature].apply(lambda x: self._process_feature(x))
output_df[f"{feature}_quantile{self.num_quantiles}"] = group_data

def process_feature(feature):
group_data = date_groups[feature].apply(lambda x: self._quantile_transform(x))
return pd.Series(group_data, name=f"{feature}_quantile{self.num_quantiles}")

output_series_list = Parallel(n_jobs=self.cpu_cores)(
delayed(process_feature)(feature) for feature in tqdm(self.features, desc=f"Quantiling {len(self.features)} features")
)
output_df = pd.concat(output_series_list, axis=1)
return output_df.to_numpy()

def fit_transform(self, X: Union[np.array, pd.DataFrame], eras: pd.Series):
Expand Down
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "numerblox"
version = "1.1.16"
version = "1.1.17"
description = "Solid Numerai Pipelines"
authors = ["CrowdCent <[email protected]>"]
license = "MIT License"
Expand Down

0 comments on commit f595f25

Please sign in to comment.