Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meta rules #1057

Merged
merged 14 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 0 additions & 78 deletions fedot/api/api_utils/api_data_analyser.py

This file was deleted.

12 changes: 7 additions & 5 deletions fedot/api/api_utils/api_params_repository.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from functools import partial
from typing import Sequence


Expand All @@ -18,8 +19,8 @@ class ApiParamsRepository:

COMPOSER_REQUIREMENTS_KEYS = {'max_arity', 'max_depth', 'num_of_generations',
'early_stopping_iterations', 'early_stopping_timeout',
'parallelization_mode', 'use_input_preprocessing', 'show_progress',
'collect_intermediate_metric', 'keep_n_best',
'parallelization_mode', 'use_input_preprocessing',
'show_progress', 'collect_intermediate_metric', 'keep_n_best',
'keep_history', 'history_dir', 'cv_folds', 'validation_blocks'}

STATIC_INDIVIDUAL_METADATA_KEYS = {'use_input_preprocessing'}
Expand Down Expand Up @@ -66,6 +67,7 @@ def default_params_for_task(task_type: TaskTypesEnum) -> dict:
use_pipelines_cache=True,
use_preprocessing_cache=True,
use_input_preprocessing=True,
use_meta_rules=False,
cache_dir=None,
keep_history=True,
history_dir=None,
Expand Down Expand Up @@ -120,11 +122,11 @@ def get_params_for_gp_algorithm_params(self, params: dict) -> dict:
if params.get('genetic_scheme') == 'steady_state':
gp_algorithm_params['genetic_scheme_type'] = GeneticSchemeTypesEnum.steady_state

gp_algorithm_params['mutation_types'] = ApiParamsRepository._get_default_mutations(self.task_type)
gp_algorithm_params['mutation_types'] = ApiParamsRepository._get_default_mutations(self.task_type, params)
return gp_algorithm_params

@staticmethod
def _get_default_mutations(task_type: TaskTypesEnum) -> Sequence[MutationTypesEnum]:
def _get_default_mutations(task_type: TaskTypesEnum, params) -> Sequence[MutationTypesEnum]:
mutations = [parameter_change_mutation,
MutationTypesEnum.single_change,
MutationTypesEnum.single_drop,
Expand All @@ -133,6 +135,6 @@ def _get_default_mutations(task_type: TaskTypesEnum) -> Sequence[MutationTypesEn

# TODO remove workaround after boosting mutation fix
if task_type == TaskTypesEnum.ts_forecasting:
mutations.append(boosting_mutation)
mutations.append(partial(boosting_mutation, params=params))

return mutations
129 changes: 129 additions & 0 deletions fedot/api/api_utils/input_analyser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from functools import partial
from inspect import signature

import numpy as np
from typing import Dict, Tuple, Any, Union

from golem.core.log import default_log

from fedot.core.composer.meta_rules import get_cv_folds_number, get_recommended_preset, \
get_early_stopping_generations
from fedot.core.data.data import InputData
from fedot.core.data.data_preprocessing import find_categorical_columns
from fedot.core.data.multi_modal import MultiModalData
from fedot.core.repository.dataset_types import DataTypesEnum


meta_rules = [get_cv_folds_number,
get_recommended_preset,
get_early_stopping_generations]


class InputAnalyser:
"""
Class to analyse input that comes to FEDOT API: input data and params
All methods are inplace to prevent unnecessary copy of large datasets
It functionality is:
1) Cut large datasets to prevent memory stackoverflow
2) Use label encoder with tree models instead OneHot when summary cardinality of categorical features is high
3) Give recommendations according to meta rules for more successful optimization process
"""

def __init__(self, safe_mode: bool):
self.safe_mode = safe_mode
self.max_size = 50000000
self.max_cat_cardinality = 50
self._log = default_log('InputAnalyzer')

def give_recommendations(self, input_data: Union[InputData, MultiModalData], input_params=None) \
-> Tuple[Dict, Dict]:
"""
Gives recommendations for data and input parameters.
:param input_data: data for preprocessing
:param input_params: input parameters from FEDOT API
:return : dict with str recommendations
"""

if input_params is None:
input_params = {}

recommendations_for_data = dict()
recommendations_for_params = dict()

if isinstance(input_data, MultiModalData):
for data_source_name, values in input_data.items():
recommendations_for_data[data_source_name], recommendations_for_params[data_source_name] = \
self.give_recommendations(input_data[data_source_name],
input_params=input_params)
elif isinstance(input_data, InputData) and input_data.data_type in [DataTypesEnum.table, DataTypesEnum.text]:
recommendations_for_data = self._give_recommendations_for_data(input_data=input_data)

recommendations_for_params = dict()
if 'use_meta_rules' in input_params.keys():
recommendations_for_params = self._give_recommendations_with_meta_rules(input_data=input_data,
MorrisNein marked this conversation as resolved.
Show resolved Hide resolved
input_params=input_params)
if 'label_encoded' in recommendations_for_data.keys():
recommendations_for_params['label_encoded'] = recommendations_for_data['label_encoded']

return recommendations_for_data, recommendations_for_params

def _give_recommendations_for_data(self, input_data: InputData) -> Dict:
"""
Gives a recommendation of cutting dataset or using label encoding
:param input_data: data for preprocessing
:return : dict with str recommendations
"""

recommendations_for_data = {}
if self.safe_mode:
is_cut_needed, border = self.control_size(input_data)
if is_cut_needed:
recommendations_for_data['cut'] = {'border': border}
is_label_encoding_needed = self.control_categorical(input_data)
if is_label_encoding_needed:
recommendations_for_data['label_encoded'] = {}
return recommendations_for_data

def _give_recommendations_with_meta_rules(self, input_data: InputData, input_params: Dict):
recommendations = dict()
for rule in meta_rules:
if 'input_params' in signature(rule).parameters:
rule = partial(rule, input_params=input_params)
if 'input_data' in signature(rule).parameters:
rule = partial(rule, input_data=input_data)
cur_recommendation = rule(log=self._log)
# if there is recommendation to change parameter
if list(cur_recommendation.values())[0]:
recommendations.update(cur_recommendation)
return recommendations

def control_size(self, input_data: InputData) -> Tuple[bool, Any]:
"""
Check if size of table (N*M) > threshold and cutting is needed
:param input_data: data for preprocessing

:return : (is_cut_needed, border) is cutting is needed | if yes - border of cutting,
"""

if input_data.data_type == DataTypesEnum.table:
if input_data.features.shape[0] * input_data.features.shape[1] > self.max_size:
border = self.max_size // input_data.features.shape[1]
return True, border
return False, None

def control_categorical(self, input_data: InputData) -> bool:
"""
Check if use label encoder instead oneHot if summary cardinality > threshold

:param input_data: data for preprocessing
"""

categorical_ids, _ = find_categorical_columns(input_data.features)
all_cardinality = 0
need_label = False
for idx in categorical_ids:
all_cardinality += np.unique(input_data.features[:, idx].astype(str)).shape[0]
if all_cardinality > self.max_cat_cardinality:
need_label = True
break
return need_label
8 changes: 7 additions & 1 deletion fedot/api/api_utils/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def accept_and_apply_recommendations(self, input_data: Union[InputData, MultiMod
self.log.info("Change preset due to label encoding")
self.change_preset_for_label_encoded_data(input_data.task, input_data.data_type)

# update api params with recommendations obtained using meta rules
for key in self.data.keys():
if key not in recommendations:
continue
self.update({key: recommendations[key]})

def change_preset_for_label_encoded_data(self, task: Task, data_type: DataTypesEnum):
""" Change preset on tree like preset, if data had been label encoded """
if 'preset' in self:
Expand All @@ -89,7 +95,7 @@ def _get_task_with_params(self, problem: str, task_params: Optional[TaskParams]
}
try:
return task_dict[problem]
except ValueError as exc:
except ValueError:
ValueError('Wrong type name of the given task')

def _check_timeout_vs_generations(self):
Expand Down
19 changes: 12 additions & 7 deletions fedot/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from fedot.api.api_utils.api_composer import ApiComposer
from fedot.api.api_utils.api_data import ApiDataProcessor
from fedot.api.api_utils.api_data_analyser import DataAnalyser
from fedot.api.api_utils.input_analyser import InputAnalyser
from fedot.api.api_utils.data_definition import FeaturesType, TargetType
from fedot.api.api_utils.metrics import ApiMetrics
from fedot.api.api_utils.params import ApiParams
Expand Down Expand Up @@ -112,6 +112,7 @@ class Fedot:
- ``automl`` -> A special preset with only AutoML libraries such as TPOT and H2O as operations.

use_input_preprocessing: bool indicating whether to do preprocessing of further given data, enabled by default.
use_meta_rules: bool indicating whether to change set params according to FEDOT meta rules
use_pipelines_cache: bool indicating whether to use pipeline structures caching, enabled by default.
use_preprocessing_cache: bool indicating whether to use optional preprocessors caching, enabled by default.
cache_dir: path to the place where cache files should be stored (if any cache is enabled).
Expand Down Expand Up @@ -143,7 +144,7 @@ def __init__(self,
# Initialize data processors for data preprocessing and preliminary data analysis
self.data_processor = ApiDataProcessor(task=self.params.task,
use_input_preprocessing=self.params.get('use_input_preprocessing'))
self.data_analyser = DataAnalyser(safe_mode=safe_mode)
self.data_analyser = InputAnalyser(safe_mode=safe_mode)

self.target: Optional[TargetType] = None
self.prediction: Optional[OutputData] = None
Expand Down Expand Up @@ -184,11 +185,15 @@ def fit(self,

if self.params.get('use_input_preprocessing'):
# Launch data analyser - it gives recommendations for data preprocessing
recommendations = self.data_analyser.give_recommendation(self.train_data)
self.data_processor.accept_and_apply_recommendations(self.train_data, recommendations)
self.params.accept_and_apply_recommendations(self.train_data, recommendations)
recommendations_for_data, recommendations_for_params = \
self.data_analyser.give_recommendations(input_data=self.train_data,
input_params=self.params)
self.data_processor.accept_and_apply_recommendations(input_data=self.train_data,
recommendations=recommendations_for_data)
self.params.accept_and_apply_recommendations(input_data=self.train_data,
recommendations=recommendations_for_params)
else:
recommendations = None
recommendations_for_data = None

self._init_remote_if_necessary()

Expand All @@ -206,7 +211,7 @@ def fit(self,
full_train_not_preprocessed = deepcopy(self.train_data)
# Final fit for obtained pipeline on full dataset
if self.history and not self.history.is_empty() or not self.current_pipeline.is_fitted:
self._train_pipeline_on_full_dataset(recommendations, full_train_not_preprocessed)
self._train_pipeline_on_full_dataset(recommendations_for_data, full_train_not_preprocessed)
self.log.message('Final pipeline was fitted')
else:
self.log.message('Already fitted initial pipeline is used')
Expand Down
12 changes: 6 additions & 6 deletions fedot/core/composer/gp_composer/specific_operators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from random import choice, random
from typing import List

from golem.core.optimisers.genetic.operators.mutation import Mutation
from golem.core.optimisers.genetic.operators.base_mutations import get_mutation_prob

from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.pipeline import Pipeline
Expand All @@ -10,14 +10,14 @@
from fedot.core.repository.tasks import TaskTypesEnum


def parameter_change_mutation(pipeline: Pipeline, requirements, params, opt_params, **kwargs) -> Pipeline:
def parameter_change_mutation(pipeline: Pipeline, requirements, graph_gen_params, parameters, **kwargs) -> Pipeline:
"""
This type of mutation is passed over all nodes and changes
hyperparameters of the operations with probability - 'node mutation probability'
which is initialised inside the function
"""
node_mutation_probability = Mutation.get_mutation_prob(mut_id=opt_params.mutation_strength,
node=pipeline.root_node)
node_mutation_probability = get_mutation_prob(mut_id=parameters.mutation_strength,
node=pipeline.root_node)
for node in pipeline.nodes:
if random() < node_mutation_probability:
operation_name = node.operation.operation_type
Expand All @@ -34,11 +34,11 @@ def parameter_change_mutation(pipeline: Pipeline, requirements, params, opt_para
return pipeline


def boosting_mutation(pipeline: Pipeline, requirements, params, **kwargs) -> Pipeline:
def boosting_mutation(pipeline: Pipeline, requirements, graph_gen_params, **kwargs) -> Pipeline:
""" This type of mutation adds the additional 'boosting' cascade to the existing pipeline """

# TODO: refactor next line to get task_type more obviously
task_type = params.advisor.task.task_type
task_type = graph_gen_params.advisor.task.task_type
decompose_operations = OperationTypesRepository('data_operation').suitable_operation(
task_type=task_type, tags=['decompose'])
decompose_operation = decompose_operations[0]
Expand Down
Loading