Skip to content

Commit

Permalink
feat(Harmonization): Add heterogeneity scores
Browse files Browse the repository at this point in the history
  • Loading branch information
Rapsssito committed Nov 24, 2023
1 parent d4b9686 commit 91c2907
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 52 deletions.
69 changes: 40 additions & 29 deletions modules/oncoliner_harmonization/src/harmonizator/harmonizator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,24 @@
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import numpy as np

# Add vcf-ops to the path
sys.path.insert(0, os.path.join(os.path.abspath(os.path.dirname(__file__)), '..', '..', '..', '..', 'shared', 'vcf_ops', 'src'))

from vcf_ops.metrics import filter_metrics_recommendations # noqa

from .utils import cleanup_text # noqa
from .heterogeneity_metrics import compute_h_score, compute_gdr # noqa


def _read_pipeline_improvements(pipeline_folder: str) -> Dict[str, pd.DataFrame]:
# Read all .csv files in the pipeline folder
csv_files = glob.glob(os.path.join(pipeline_folder, 'improvement_list', '*.csv'))
if len(csv_files) == 0:
raise Exception(f'No improvement files found in {pipeline_folder}')
# Get the first row of the first file to get the column names
first_row = pd.read_csv(csv_files[0], nrows=1)
# Get the columns and remove all those that end with _genes
selected_columns = [column for column in first_row.columns if not column.endswith('_genes')]
# Read all the improvements
pipeline_improvements = [pd.read_csv(csv_file, usecols=selected_columns) for csv_file in csv_files]
pipeline_improvements = [pd.read_csv(csv_file) for csv_file in csv_files]
# Concatenate all the improvements into a single dataframe
pipeline_improvements = pd.concat(pipeline_improvements, ignore_index=True)
# Create a dict with the variant_type and variant_size
Expand All @@ -43,7 +41,6 @@ def _get_pipelines_combinations(pipelines_folders: List[str], threads: int) -> D
# Each pipeline folder contains a list of .csv files with its possible improvements
# Read all of them and concatenate them into a single dataframe
# Create a dictionary with the pipeline name as key and the improvements dataframe as value
pipelines_combinations = OrderedDict()
pipeline_names = sorted([os.path.basename(pipeline_folder) for pipeline_folder in pipelines_folders])
# Make sure the pipeline names are unique
if len(pipeline_names) != len(set(pipeline_names)):
Expand All @@ -52,22 +49,45 @@ def _get_pipelines_combinations(pipelines_folders: List[str], threads: int) -> D
futures = []
for pipeline_folder in pipelines_folders:
futures.append(pool.submit(_read_pipeline_improvements, pipeline_folder))
# Initialize the dictionary with the pipeline names sorted
pipelines_combinations = OrderedDict(zip(pipeline_names, [None] * len(pipeline_names)))
for future in as_completed(futures):
pipeline_improvements, pipeline_folder = future.result()
pipeline_name = os.path.basename(pipeline_folder)
pipelines_combinations[pipeline_name] = pipeline_improvements
return pipelines_combinations


def _filter_combinations(pipelines_names: List[str], df: pd.DataFrame, num_callers_column: str, ranking_columns: List[str], loss_margin: float, max_recommendations: int) -> Set[str]:

def _filter_combinations(pipelines_names: List[str], df: pd.DataFrame, num_callers_column: str, ranking_columns: List[str], priority_columns: List[str], loss_margin: float, max_recommendations: int) -> Set[str]:
# Create a new column with the concatenation of the operations
df = df.assign(operation=lambda x: x[pipelines_names].apply(lambda y: ';'.join(y), axis=1))
filtered_df = filter_metrics_recommendations(df, loss_margin, max_recommendations, num_callers_column, ranking_columns)
filtered_df = filter_metrics_recommendations(df, loss_margin, max_recommendations, num_callers_column, ranking_columns, priority_columns)
result = set(filtered_df['operation'].unique())
return result


def _compute_metrics(operations_combination, improvements_combinations_metrics, variant_type_size, metrics_name):
computable_metrics = []
for op in operations_combination:
pipeline_name, operation = op.split(';')
metrics = improvements_combinations_metrics[pipeline_name][variant_type_size]
# Get the metrics for the current operation
metrics = metrics[metrics['operation'] == operation].iloc[0]
# Calculate the metrics for the current operation
computable_metrics.append(metrics)
# Calculate the metrics for the combination
if metrics_name == 'h_score':
return compute_h_score([(metrics['recall'], metrics['precision']) for metrics in computable_metrics])
elif metrics_name == 'gdr':
return compute_gdr([metrics['protein_affected_genes'] for metrics in computable_metrics])
elif metrics_name.endswith('_avg'):
return np.mean([metrics[metrics_name.replace('_avg', '')] for metrics in computable_metrics])
elif metrics_name.endswith('_sum'):
return np.sum([metrics[metrics_name.replace('_sum', '')] for metrics in computable_metrics])
else:
raise Exception(f'Unknown metric {metrics_name}')


def _group_combinations(improvements_combinations_metrics: Dict[str, Dict[str, pd.DataFrame]], loss_margin: float, max_recommendations: int) -> Dict[str, pd.DataFrame]:
first_pipeline_improvements = list(improvements_combinations_metrics.values())[0]
pipeline_names = list(improvements_combinations_metrics.keys())
Expand Down Expand Up @@ -96,29 +116,20 @@ def _group_combinations(improvements_combinations_metrics: Dict[str, Dict[str, p
combinations_row[pipeline_name] = operation
combinations_row['variant_type'] = variant_type_size.split(';')[0]
combinations_row['variant_size'] = variant_type_size.split(';')[1]
combinations_row['recall_avg'] = 0
combinations_row['precision_avg'] = 0
combinations_row['f1_score_avg'] = 0
combinations_row['protein_affected_genes_count_avg'] = 0
combinations_row['protein_affected_driver_genes_count_avg'] = 0
combinations_row['added_callers_sum'] = 0
# Calculate the metrics for each pipeline
for op in operations_combination:
pipeline_name, operation = op.split(';')
metrics = improvements_combinations_metrics[pipeline_name][variant_type_size]
# Get the metrics for the current operation
metrics = metrics[metrics['operation'] == operation].iloc[0]
# Calculate the metrics for the current operation
for combinations_row_key in combinations_row.keys():
if combinations_row_key.endswith('_avg'):
combinations_row[combinations_row_key] += \
metrics[combinations_row_key.replace('_avg', '')] / len(operations_combination)
elif combinations_row_key.endswith('_sum'):
combinations_row[combinations_row_key] += metrics[combinations_row_key.replace('_sum', '')]
computable_metrics = ['h_score', 'recall_avg', 'precision_avg', 'f1_score_avg', 'gdr',
'protein_affected_genes_count_avg', 'protein_affected_driver_genes_count_avg', 'protein_affected_actionable_genes_count_avg',
'added_callers_sum']
for metrics_name in computable_metrics:
combinations_row[metrics_name] = _compute_metrics(operations_combination, improvements_combinations_metrics, variant_type_size, metrics_name)
combinations_rows.append(combinations_row)
# Create a dataframe with the metrics
combinations_df = pd.DataFrame(combinations_rows)
selected_combinations.update(_filter_combinations(pipeline_names, combinations_df, 'added_callers_sum', ['f1_score_avg', 'recall_avg', 'precision_avg'], loss_margin, max_recommendations))
# Invert values for h_score and gdr
combinations_df['h_score_inv'] = - combinations_df['h_score']
combinations_df['gdr_inv'] = - combinations_df['gdr']
selected_combinations.update(_filter_combinations(pipeline_names, combinations_df, 'added_callers_sum', ['h_score_inv', 'gdr_inv'], ['f1_score_avg', 'recall_avg', 'precision_avg'], loss_margin, max_recommendations))
# Drop the inverted columns
combinations_df.drop(columns=['h_score_inv', 'gdr_inv'], inplace=True)
result[variant_type_size] = combinations_df
# Filter the combinations
for variant_type_size, combinations_df in result.items():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import List, Tuple
import sys
import os

import pandas as pd
import numpy as np

# Add vcf-ops to the path
sys.path.insert(0, os.path.join(os.path.abspath(os.path.dirname(__file__)), '..', '..', '..', '..', 'shared', 'vcf_ops', 'src'))
from vcf_ops.genes import GENE_SPLIT_SYMBOL # noqa

def compute_h_score(metrics: List[Tuple[float, float]]) -> float:
# Compute the centroid
centroid = np.mean(metrics, axis=0)
# Compute the distance from the centroid to each point
distances = []
for point in metrics:
distances.append(np.linalg.norm(point - centroid))
# Compute the h-score
h_score = np.mean(distances)
return h_score


def compute_gdr(genes_list: List[str]) -> float:
intersect_genes = None
union_genes = None
for genes in genes_list:
if type(genes) == float:
continue
curr_genes = set(genes.split(GENE_SPLIT_SYMBOL))
if intersect_genes is None:
intersect_genes = curr_genes
union_genes = curr_genes
continue
intersect_genes = intersect_genes.intersection(curr_genes)
union_genes = union_genes.union(curr_genes)
if union_genes is None or len(union_genes) == 0:
return 0
return 1 - (len(intersect_genes) / len(union_genes))
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def compute_improvements(callers_folders, user_folder, results_output_folder, re
return improvement_list

def filter_operations(df: pd.DataFrame, loss_margin: float, max_recommendations: int):
filtered_df = filter_metrics_recommendations(df, loss_margin, max_recommendations, 'added_callers', ['f1_score', 'recall', 'precision'])
filtered_df = filter_metrics_recommendations(df, loss_margin, max_recommendations, 'added_callers')
result = set(filtered_df['operation'].unique())
return result

Expand Down
3 changes: 0 additions & 3 deletions oncoliner_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,8 @@ def check_samples_folders(pipeline_folder_path: str, sample_names: Set[str]) ->
"""
pipeline_samples = set(os.listdir(pipeline_folder_path))
missing_samples = sample_names - pipeline_samples
missing_pipeline_samples = pipeline_samples - sample_names
if missing_samples:
raise Exception(f'The following samples are missing in {pipeline_folder_path}: {missing_samples}')
if missing_pipeline_samples:
logging.warning(f'The following samples are missing from the config file: {missing_pipeline_samples}')


if __name__ == '__main__':
Expand Down
7 changes: 0 additions & 7 deletions shared/vcf_ops/setup.cfg

This file was deleted.

9 changes: 0 additions & 9 deletions shared/vcf_ops/src/vcf_ops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +0,0 @@
# Copyright 2023 - Barcelona Supercomputing Center
# Author: Rodrigo Martín
# BSC Dual License

# Export VariantType from VariantExtractor
from variant_extractor.variants import VariantType

__version__ = '0.0.1'
__author__ = 'Rapsssito'
4 changes: 2 additions & 2 deletions shared/vcf_ops/src/vcf_ops/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ def aggregate_metrics(metrics_list):
agg_metrics = agg_metrics[METRICS_COLUMNS]
return agg_metrics

def filter_metrics_recommendations(df: pd.DataFrame, loss_margin: float, max_recommendations: int, num_callers_column='num_callers', ranking_columns=['f1_score', 'recall', 'precision']):
def filter_metrics_recommendations(df: pd.DataFrame, loss_margin: float, max_recommendations: int, num_callers_column='num_callers', ranking_columns=['f1_score', 'recall', 'precision'], priority_columns=['f1_score', 'recall', 'precision']):
if len(df) == 0:
return df
selected_dfs = []
for i, ranking_column in enumerate(ranking_columns):
sort_columns = [ranking_column] + ranking_columns[:i] + ranking_columns[i+1:]
sort_columns = [ranking_column] + priority_columns
# Filter all rows with the max element for each column - loss_margin
df_temp = df
for column in sort_columns:
Expand Down
2 changes: 1 addition & 1 deletion tools/pipeline_designer/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def execute_operations(operations: List[str], combinations_folder: str, evaluati


def filter_operations(df: pd.DataFrame, loss_margin: float, max_recommendations: int):
return filter_metrics_recommendations(df, loss_margin, max_recommendations, 'num_callers', ['f1_score', 'recall', 'precision'])['operation'].unique()
return filter_metrics_recommendations(df, loss_margin, max_recommendations, 'num_callers')['operation'].unique()


def write_improvement_lists(evaluation_callers_folders: List[str], loss_margin: float, max_recommendations: int, output_folder: str, processes: int):
Expand Down

0 comments on commit 91c2907

Please sign in to comment.