diff --git a/fedot/api/api_utils/api_data_analyser.py b/fedot/api/api_utils/api_data_analyser.py deleted file mode 100644 index fc9a3dd63f..0000000000 --- a/fedot/api/api_utils/api_data_analyser.py +++ /dev/null @@ -1,78 +0,0 @@ -import numpy as np -from typing import Dict, Tuple, Any - -from fedot.core.data.data import InputData -from fedot.core.data.data_preprocessing import find_categorical_columns -from fedot.core.data.multi_modal import MultiModalData -from fedot.core.repository.dataset_types import DataTypesEnum -from fedot.preprocessing.structure import DEFAULT_SOURCE_NAME - - -class DataAnalyser: - """ - Class to analyse data that comes to FEDOT API. - All methods are inplace to prevent unnecessary copy of large datasets - It functionality is: - 1) Cut large datasets to prevent memory stackoverflow - 2) Use label encoder with tree models instead OneHot when summary cardinality of categorical features is high - """ - - def __init__(self, safe_mode: bool): - self.safe_mode = safe_mode - self.max_size = 50000000 - self.max_cat_cardinality = 50 - - # TODO implement correct logic to process multimodal data - def give_recommendation(self, input_data: InputData, source_name: str = DEFAULT_SOURCE_NAME) -> Dict: - """ - Gives a recommendation of cutting dataset or using label encoding - :param input_data: data for preprocessing - :param source_name: name of data source node - :return : dict with str recommendations - """ - - recommendations = {} - if isinstance(input_data, MultiModalData): - for data_source_name, values in input_data.items(): - recommendations[data_source_name] = self.give_recommendation(input_data[data_source_name], - data_source_name) - elif isinstance(input_data, InputData) and input_data.data_type == DataTypesEnum.table: - if self.safe_mode: - is_cut_needed, border = self.control_size(input_data) - if is_cut_needed: - recommendations['cut'] = {'border': border} - is_label_encoding_needed = self.control_categorical(input_data) - if is_label_encoding_needed: - recommendations['label_encoded'] = {} - return recommendations - - def control_size(self, input_data: InputData) -> Tuple[bool, Any]: - """ - Check if size of table (N*M) > threshold and cutting is needed - :param input_data: data for preprocessing - - :return : (is_cut_needed, border) is cutting is needed | if yes - border of cutting, - """ - - if input_data.data_type == DataTypesEnum.table: - if input_data.features.shape[0] * input_data.features.shape[1] > self.max_size: - border = self.max_size // input_data.features.shape[1] - return True, border - return False, None - - def control_categorical(self, input_data: InputData) -> bool: - """ - Check if use label encoder instead oneHot if summary cardinality > threshold - - :param input_data: data for preprocessing - """ - - categorical_ids, _ = find_categorical_columns(input_data.features) - all_cardinality = 0 - need_label = False - for idx in categorical_ids: - all_cardinality += np.unique(input_data.features[:, idx].astype(str)).shape[0] - if all_cardinality > self.max_cat_cardinality: - need_label = True - break - return need_label diff --git a/fedot/api/api_utils/api_params_repository.py b/fedot/api/api_utils/api_params_repository.py index e0a7f77be6..d96b448809 100644 --- a/fedot/api/api_utils/api_params_repository.py +++ b/fedot/api/api_utils/api_params_repository.py @@ -1,4 +1,5 @@ import datetime +from functools import partial from typing import Sequence @@ -18,8 +19,8 @@ class ApiParamsRepository: COMPOSER_REQUIREMENTS_KEYS = {'max_arity', 'max_depth', 'num_of_generations', 'early_stopping_iterations', 'early_stopping_timeout', - 'parallelization_mode', 'use_input_preprocessing', 'show_progress', - 'collect_intermediate_metric', 'keep_n_best', + 'parallelization_mode', 'use_input_preprocessing', + 'show_progress', 'collect_intermediate_metric', 'keep_n_best', 'keep_history', 'history_dir', 'cv_folds', 'validation_blocks'} STATIC_INDIVIDUAL_METADATA_KEYS = {'use_input_preprocessing'} @@ -66,6 +67,7 @@ def default_params_for_task(task_type: TaskTypesEnum) -> dict: use_pipelines_cache=True, use_preprocessing_cache=True, use_input_preprocessing=True, + use_meta_rules=False, cache_dir=None, keep_history=True, history_dir=None, @@ -120,11 +122,11 @@ def get_params_for_gp_algorithm_params(self, params: dict) -> dict: if params.get('genetic_scheme') == 'steady_state': gp_algorithm_params['genetic_scheme_type'] = GeneticSchemeTypesEnum.steady_state - gp_algorithm_params['mutation_types'] = ApiParamsRepository._get_default_mutations(self.task_type) + gp_algorithm_params['mutation_types'] = ApiParamsRepository._get_default_mutations(self.task_type, params) return gp_algorithm_params @staticmethod - def _get_default_mutations(task_type: TaskTypesEnum) -> Sequence[MutationTypesEnum]: + def _get_default_mutations(task_type: TaskTypesEnum, params) -> Sequence[MutationTypesEnum]: mutations = [parameter_change_mutation, MutationTypesEnum.single_change, MutationTypesEnum.single_drop, @@ -133,6 +135,6 @@ def _get_default_mutations(task_type: TaskTypesEnum) -> Sequence[MutationTypesEn # TODO remove workaround after boosting mutation fix if task_type == TaskTypesEnum.ts_forecasting: - mutations.append(boosting_mutation) + mutations.append(partial(boosting_mutation, params=params)) return mutations diff --git a/fedot/api/api_utils/input_analyser.py b/fedot/api/api_utils/input_analyser.py new file mode 100644 index 0000000000..1f5023c33f --- /dev/null +++ b/fedot/api/api_utils/input_analyser.py @@ -0,0 +1,129 @@ +from functools import partial +from inspect import signature + +import numpy as np +from typing import Dict, Tuple, Any, Union + +from golem.core.log import default_log + +from fedot.core.composer.meta_rules import get_cv_folds_number, get_recommended_preset, \ + get_early_stopping_generations +from fedot.core.data.data import InputData +from fedot.core.data.data_preprocessing import find_categorical_columns +from fedot.core.data.multi_modal import MultiModalData +from fedot.core.repository.dataset_types import DataTypesEnum + + +meta_rules = [get_cv_folds_number, + get_recommended_preset, + get_early_stopping_generations] + + +class InputAnalyser: + """ + Class to analyse input that comes to FEDOT API: input data and params + All methods are inplace to prevent unnecessary copy of large datasets + It functionality is: + 1) Cut large datasets to prevent memory stackoverflow + 2) Use label encoder with tree models instead OneHot when summary cardinality of categorical features is high + 3) Give recommendations according to meta rules for more successful optimization process + """ + + def __init__(self, safe_mode: bool): + self.safe_mode = safe_mode + self.max_size = 50000000 + self.max_cat_cardinality = 50 + self._log = default_log('InputAnalyzer') + + def give_recommendations(self, input_data: Union[InputData, MultiModalData], input_params=None) \ + -> Tuple[Dict, Dict]: + """ + Gives recommendations for data and input parameters. + :param input_data: data for preprocessing + :param input_params: input parameters from FEDOT API + :return : dict with str recommendations + """ + + if input_params is None: + input_params = {} + + recommendations_for_data = dict() + recommendations_for_params = dict() + + if isinstance(input_data, MultiModalData): + for data_source_name, values in input_data.items(): + recommendations_for_data[data_source_name], recommendations_for_params[data_source_name] = \ + self.give_recommendations(input_data[data_source_name], + input_params=input_params) + elif isinstance(input_data, InputData) and input_data.data_type in [DataTypesEnum.table, DataTypesEnum.text]: + recommendations_for_data = self._give_recommendations_for_data(input_data=input_data) + + recommendations_for_params = dict() + if 'use_meta_rules' in input_params.keys(): + recommendations_for_params = self._give_recommendations_with_meta_rules(input_data=input_data, + input_params=input_params) + if 'label_encoded' in recommendations_for_data.keys(): + recommendations_for_params['label_encoded'] = recommendations_for_data['label_encoded'] + + return recommendations_for_data, recommendations_for_params + + def _give_recommendations_for_data(self, input_data: InputData) -> Dict: + """ + Gives a recommendation of cutting dataset or using label encoding + :param input_data: data for preprocessing + :return : dict with str recommendations + """ + + recommendations_for_data = {} + if self.safe_mode: + is_cut_needed, border = self.control_size(input_data) + if is_cut_needed: + recommendations_for_data['cut'] = {'border': border} + is_label_encoding_needed = self.control_categorical(input_data) + if is_label_encoding_needed: + recommendations_for_data['label_encoded'] = {} + return recommendations_for_data + + def _give_recommendations_with_meta_rules(self, input_data: InputData, input_params: Dict): + recommendations = dict() + for rule in meta_rules: + if 'input_params' in signature(rule).parameters: + rule = partial(rule, input_params=input_params) + if 'input_data' in signature(rule).parameters: + rule = partial(rule, input_data=input_data) + cur_recommendation = rule(log=self._log) + # if there is recommendation to change parameter + if list(cur_recommendation.values())[0]: + recommendations.update(cur_recommendation) + return recommendations + + def control_size(self, input_data: InputData) -> Tuple[bool, Any]: + """ + Check if size of table (N*M) > threshold and cutting is needed + :param input_data: data for preprocessing + + :return : (is_cut_needed, border) is cutting is needed | if yes - border of cutting, + """ + + if input_data.data_type == DataTypesEnum.table: + if input_data.features.shape[0] * input_data.features.shape[1] > self.max_size: + border = self.max_size // input_data.features.shape[1] + return True, border + return False, None + + def control_categorical(self, input_data: InputData) -> bool: + """ + Check if use label encoder instead oneHot if summary cardinality > threshold + + :param input_data: data for preprocessing + """ + + categorical_ids, _ = find_categorical_columns(input_data.features) + all_cardinality = 0 + need_label = False + for idx in categorical_ids: + all_cardinality += np.unique(input_data.features[:, idx].astype(str)).shape[0] + if all_cardinality > self.max_cat_cardinality: + need_label = True + break + return need_label diff --git a/fedot/api/api_utils/params.py b/fedot/api/api_utils/params.py index 1b68e8a07c..2131add3b6 100644 --- a/fedot/api/api_utils/params.py +++ b/fedot/api/api_utils/params.py @@ -66,6 +66,12 @@ def accept_and_apply_recommendations(self, input_data: Union[InputData, MultiMod self.log.info("Change preset due to label encoding") self.change_preset_for_label_encoded_data(input_data.task, input_data.data_type) + # update api params with recommendations obtained using meta rules + for key in self.data.keys(): + if key not in recommendations: + continue + self.update({key: recommendations[key]}) + def change_preset_for_label_encoded_data(self, task: Task, data_type: DataTypesEnum): """ Change preset on tree like preset, if data had been label encoded """ if 'preset' in self: @@ -89,7 +95,7 @@ def _get_task_with_params(self, problem: str, task_params: Optional[TaskParams] } try: return task_dict[problem] - except ValueError as exc: + except ValueError: ValueError('Wrong type name of the given task') def _check_timeout_vs_generations(self): diff --git a/fedot/api/main.py b/fedot/api/main.py index e75f11a9e3..2cbba6fa38 100644 --- a/fedot/api/main.py +++ b/fedot/api/main.py @@ -12,7 +12,7 @@ from fedot.api.api_utils.api_composer import ApiComposer from fedot.api.api_utils.api_data import ApiDataProcessor -from fedot.api.api_utils.api_data_analyser import DataAnalyser +from fedot.api.api_utils.input_analyser import InputAnalyser from fedot.api.api_utils.data_definition import FeaturesType, TargetType from fedot.api.api_utils.metrics import ApiMetrics from fedot.api.api_utils.params import ApiParams @@ -112,6 +112,7 @@ class Fedot: - ``automl`` -> A special preset with only AutoML libraries such as TPOT and H2O as operations. use_input_preprocessing: bool indicating whether to do preprocessing of further given data, enabled by default. + use_meta_rules: bool indicating whether to change set params according to FEDOT meta rules use_pipelines_cache: bool indicating whether to use pipeline structures caching, enabled by default. use_preprocessing_cache: bool indicating whether to use optional preprocessors caching, enabled by default. cache_dir: path to the place where cache files should be stored (if any cache is enabled). @@ -143,7 +144,7 @@ def __init__(self, # Initialize data processors for data preprocessing and preliminary data analysis self.data_processor = ApiDataProcessor(task=self.params.task, use_input_preprocessing=self.params.get('use_input_preprocessing')) - self.data_analyser = DataAnalyser(safe_mode=safe_mode) + self.data_analyser = InputAnalyser(safe_mode=safe_mode) self.target: Optional[TargetType] = None self.prediction: Optional[OutputData] = None @@ -184,11 +185,15 @@ def fit(self, if self.params.get('use_input_preprocessing'): # Launch data analyser - it gives recommendations for data preprocessing - recommendations = self.data_analyser.give_recommendation(self.train_data) - self.data_processor.accept_and_apply_recommendations(self.train_data, recommendations) - self.params.accept_and_apply_recommendations(self.train_data, recommendations) + recommendations_for_data, recommendations_for_params = \ + self.data_analyser.give_recommendations(input_data=self.train_data, + input_params=self.params) + self.data_processor.accept_and_apply_recommendations(input_data=self.train_data, + recommendations=recommendations_for_data) + self.params.accept_and_apply_recommendations(input_data=self.train_data, + recommendations=recommendations_for_params) else: - recommendations = None + recommendations_for_data = None self._init_remote_if_necessary() @@ -206,7 +211,7 @@ def fit(self, full_train_not_preprocessed = deepcopy(self.train_data) # Final fit for obtained pipeline on full dataset if self.history and not self.history.is_empty() or not self.current_pipeline.is_fitted: - self._train_pipeline_on_full_dataset(recommendations, full_train_not_preprocessed) + self._train_pipeline_on_full_dataset(recommendations_for_data, full_train_not_preprocessed) self.log.message('Final pipeline was fitted') else: self.log.message('Already fitted initial pipeline is used') diff --git a/fedot/core/composer/gp_composer/specific_operators.py b/fedot/core/composer/gp_composer/specific_operators.py index e679340a65..439bd89fad 100644 --- a/fedot/core/composer/gp_composer/specific_operators.py +++ b/fedot/core/composer/gp_composer/specific_operators.py @@ -1,7 +1,7 @@ from random import choice, random from typing import List -from golem.core.optimisers.genetic.operators.mutation import Mutation +from golem.core.optimisers.genetic.operators.base_mutations import get_mutation_prob from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline @@ -10,14 +10,14 @@ from fedot.core.repository.tasks import TaskTypesEnum -def parameter_change_mutation(pipeline: Pipeline, requirements, params, opt_params, **kwargs) -> Pipeline: +def parameter_change_mutation(pipeline: Pipeline, requirements, graph_gen_params, parameters, **kwargs) -> Pipeline: """ This type of mutation is passed over all nodes and changes hyperparameters of the operations with probability - 'node mutation probability' which is initialised inside the function """ - node_mutation_probability = Mutation.get_mutation_prob(mut_id=opt_params.mutation_strength, - node=pipeline.root_node) + node_mutation_probability = get_mutation_prob(mut_id=parameters.mutation_strength, + node=pipeline.root_node) for node in pipeline.nodes: if random() < node_mutation_probability: operation_name = node.operation.operation_type @@ -34,11 +34,11 @@ def parameter_change_mutation(pipeline: Pipeline, requirements, params, opt_para return pipeline -def boosting_mutation(pipeline: Pipeline, requirements, params, **kwargs) -> Pipeline: +def boosting_mutation(pipeline: Pipeline, requirements, graph_gen_params, **kwargs) -> Pipeline: """ This type of mutation adds the additional 'boosting' cascade to the existing pipeline """ # TODO: refactor next line to get task_type more obviously - task_type = params.advisor.task.task_type + task_type = graph_gen_params.advisor.task.task_type decompose_operations = OperationTypesRepository('data_operation').suitable_operation( task_type=task_type, tags=['decompose']) decompose_operation = decompose_operations[0] diff --git a/fedot/core/composer/meta_rules.py b/fedot/core/composer/meta_rules.py new file mode 100644 index 0000000000..da1aef7688 --- /dev/null +++ b/fedot/core/composer/meta_rules.py @@ -0,0 +1,57 @@ +from golem.core.log import LoggerAdapter +from typing import Dict, Optional + +from fedot.api.api_utils.params import ApiParams +from fedot.core.data.data import InputData + + +def get_cv_folds_number(input_data: InputData, log: LoggerAdapter) -> Dict[str, Optional[int]]: + """ Cross-validation folds are available from 1 to 10. """ + row_num = input_data.features.shape[0] + if row_num < 1000: + cv_folds = None + elif row_num < 20000: + cv_folds = 3 + else: + cv_folds = 5 + log.info(f'number of cv_folds param was set to {cv_folds}') + return {'cv_folds': cv_folds} + + +def get_recommended_preset(input_data: InputData, input_params: ApiParams, log: LoggerAdapter) \ + -> Dict[str, Optional[str]]: + """ Get appropriate preset for `input_data` and `input_params`. """ + preset = None + + if input_params.timeout: + # since there is enough time for optimization on such amount of data heavy models can be used + if input_params.timeout >= 60 and \ + input_data.features.shape[0] * input_data.features.shape[1] < 300000: + preset = 'best_quality' + + # to avoid stagnation due to too long evaluation of one population + if input_params.timeout < 10 \ + and input_data.features.shape[0] * input_data.features.shape[1] > 300000: + preset = 'fast_train' + + # to avoid overfitting for small datasets + if input_data.features.shape[0] < 5000: + preset = 'fast_train' + + if preset: + log.info(f'preset was set to {preset}') + return {'preset': preset} + + +def get_early_stopping_generations(input_params: ApiParams, log: LoggerAdapter) -> Dict[str, Optional[int]]: + """ Get number of early stopping generations depending on timeout. """ + # If early_stopping_generations is not specified, + # than estimate it as in time-based manner as: 0.33 * composing_timeout. + # The minimal number of generations is 5. + early_stopping_iterations = None + if 'early_stopping_iterations' not in input_params: + if input_params.timeout: + depending_on_timeout = int(input_params.timeout / 3) + early_stopping_iterations = depending_on_timeout if depending_on_timeout > 5 else 5 + log.info(f'early_stopping_iterations was set to {early_stopping_iterations}') + return {'early_stopping_iterations': early_stopping_iterations} diff --git a/test/unit/adapter/test_adapt_pipeline.py b/test/unit/adapter/test_adapt_pipeline.py index 0a50a0b2fc..d35b347697 100644 --- a/test/unit/adapter/test_adapt_pipeline.py +++ b/test/unit/adapter/test_adapt_pipeline.py @@ -9,6 +9,7 @@ from golem.core.dag.verification_rules import DEFAULT_DAG_RULES from golem.core.optimisers.graph import OptNode +from fedot.core.operations.operation import Operation from fedot.core.pipelines.adapters import PipelineAdapter from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline @@ -184,7 +185,7 @@ def test_changes_to_transformed_dont_affect_origin(pipeline): assert opt_graph.descriptive_id == restored_pipeline.descriptive_id changed_node = choice(restored_pipeline.nodes) - changed_node.content['name'] = 'yet_another_operation' + changed_node.content['name'] = Operation('yet_another_operation') changed_node.content['params'].update({'new_hyperparam': 4242}) # assert that changes to the restored graph don't affect original graph diff --git a/test/unit/api/test_api_safety.py b/test/unit/api/test_api_safety.py index db75d207a5..1fcd5dc5e5 100644 --- a/test/unit/api/test_api_safety.py +++ b/test/unit/api/test_api_safety.py @@ -1,7 +1,7 @@ import numpy as np from fedot.api.api_utils.api_data import ApiDataProcessor -from fedot.api.api_utils.api_data_analyser import DataAnalyser +from fedot.api.api_utils.input_analyser import InputAnalyser from fedot.api.main import Fedot from fedot.core.data.data import InputData from fedot.core.repository.dataset_types import DataTypesEnum @@ -12,7 +12,7 @@ def get_data_analyser_with_specific_params(max_size=18, max_cat_cardinality=5): """ Create a DataAnalyser object with small max dataset size and small max cardinality for categorical features""" - safety_module = DataAnalyser(safe_mode=True) + safety_module = InputAnalyser(safe_mode=True) preprocessor = ApiDataProcessor(Task(TaskTypesEnum.classification)) safety_module.max_size = max_size safety_module.max_cat_cardinality = max_cat_cardinality @@ -44,8 +44,8 @@ def test_safety_label_correct(): """ api_safety, api_preprocessor = get_data_analyser_with_specific_params() data = get_small_cat_data() - recs = api_safety.give_recommendation(data) - api_preprocessor.accept_and_apply_recommendations(data, recs) + recs_for_data, _ = api_safety.give_recommendations(data) + api_preprocessor.accept_and_apply_recommendations(data, recs_for_data) assert data.features.shape[0] * data.features.shape[1] <= api_safety.max_size assert data.features.shape[1] == 3 assert data.features[0, 0] != 'a' @@ -58,14 +58,14 @@ def test_recommendations_works_correct_in_final_fit(): api_safety, api_preprocessor = get_data_analyser_with_specific_params() data = get_small_cat_data() - recs = api_safety.give_recommendation(data) - api_preprocessor.accept_and_apply_recommendations(data, recs) + recs_for_data, _ = api_safety.give_recommendations(data) + api_preprocessor.accept_and_apply_recommendations(data, recs_for_data) data_new = get_small_cat_data() - if recs: + if recs_for_data: # if data was cut we need to refit pipeline on full data api_preprocessor.accept_and_apply_recommendations(data_new, - {k: v for k, v in recs.items() + {k: v for k, v in recs_for_data.items() if k != 'cut'}) assert data_new.features.shape[1] == 3 @@ -78,8 +78,8 @@ def test_no_safety_needed_correct(): """ api_safety, api_preprocessor = get_data_analyser_with_specific_params(max_size=100, max_cat_cardinality=100) data = get_small_cat_data() - recs = api_safety.give_recommendation(data) - api_preprocessor.accept_and_apply_recommendations(data, recs) + recs_for_data, _ = api_safety.give_recommendations(data) + api_preprocessor.accept_and_apply_recommendations(data, recs_for_data) assert data.features.shape[0] * data.features.shape[1] == 24 assert data.features.shape[1] == 3 assert data.features[0, 0] == 'a' diff --git a/test/unit/api/test_api_utils.py b/test/unit/api/test_api_utils.py index 30e7ca67ae..d807101234 100644 --- a/test/unit/api/test_api_utils.py +++ b/test/unit/api/test_api_utils.py @@ -59,9 +59,9 @@ def test_predefined_initial_assumption(): target=train_input.target, is_predict=False) old_params = deepcopy(model.params) - recommendations = model.data_analyser.give_recommendation(model.train_data) - model.data_processor.accept_and_apply_recommendations(model.train_data, recommendations) - model.params.accept_and_apply_recommendations(model.train_data, recommendations) + recs_for_data, _ = model.data_analyser.give_recommendations(model.train_data) + model.data_processor.accept_and_apply_recommendations(model.train_data, recs_for_data) + model.params.accept_and_apply_recommendations(model.train_data, recs_for_data) assert model.params.get('initial_assumption') is not None assert len(old_params) == len(model.params) diff --git a/test/unit/optimizer/gp_operators/test_mutation.py b/test/unit/optimizer/gp_operators/test_mutation.py index 1dd0366739..66ea7e048a 100644 --- a/test/unit/optimizer/gp_operators/test_mutation.py +++ b/test/unit/optimizer/gp_operators/test_mutation.py @@ -96,7 +96,7 @@ def get_ts_forecasting_graph_with_boosting() -> Pipeline: node_model = PipelineNode('ar', nodes_from=[node_init]) node_lagged = PipelineNode('lagged', nodes_from=[node_init]) node_decompose = PipelineNode('decompose', - [node_model, node_lagged]) + [node_model, node_lagged]) node_ridge = PipelineNode('ridge', nodes_from=[node_decompose]) node_final = PipelineNode('ridge', nodes_from=[node_ridge, node_model]) pipeline = Pipeline(node_final) @@ -164,7 +164,8 @@ def test_no_opt_or_graph_nodes_after_mutation(): adapter = PipelineAdapter() graph = get_simple_linear_graph() mutation = get_mutation_obj() - new_graph, _ = mutation._adapt_and_apply_mutations(new_graph=graph, num_mut=1) - new_pipeline = adapter.restore(new_graph) + for mut in mutation.parameters.mutation_types: + graph, _ = mutation._adapt_and_apply_mutation(new_graph=graph, mutation_type=mut) + new_pipeline = adapter.restore(graph) assert not find_first(new_pipeline, lambda n: type(n) in (GraphNode, OptNode))